下面列出了org.mockito.internal.stubbing.answers.ThrowsException#org.apache.hadoop.ipc.RemoteException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void handleFailure(RetryingCallerInterceptorContext context, Throwable t) throws IOException {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof RemoteException) {
RemoteException re = (RemoteException)t;
t = re.unwrapRemoteException();
}
if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)t;
}
if (t instanceof IOException) {
throw (IOException) t;
}
throw new IOException(t);
}
/**
* Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace.
*/
@VisibleForTesting
public static boolean isNoSuchColumnFamilyException(Throwable io) {
if (io instanceof RemoteException) {
io = ((RemoteException) io).unwrapRemoteException();
}
if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
return true;
}
for (; io != null; io = io.getCause()) {
if (io instanceof NoSuchColumnFamilyException) {
return true;
}
}
return false;
}
@Test
public void testAllEditsDirFailOnWrite() throws IOException {
assertTrue(doAnEdit());
// Invalidate both edits journals.
invalidateEditsDirAtIndex(0, true, true);
invalidateEditsDirAtIndex(1, true, true);
// The NN has not terminated (no ExitException thrown)
try {
doAnEdit();
fail("The previous edit could not be synced to any persistent storage, "
+ " should have halted the NN");
} catch (RemoteException re) {
assertTrue(re.getClassName().contains("ExitException"));
GenericTestUtils.assertExceptionContains(
"Could not sync enough journals to persistent storage due to " +
"No journals available to flush. " +
"Unsynced transactions: 1", re);
}
}
@Test
public void testAllEditsDirFailOnWrite() throws IOException {
assertTrue(doAnEdit());
// Invalidate both edits journals.
invalidateEditsDirAtIndex(0, true, true);
invalidateEditsDirAtIndex(1, true, true);
// The NN has not terminated (no ExitException thrown)
try {
doAnEdit();
fail("The previous edit could not be synced to any persistent storage, "
+ " should have halted the NN");
} catch (RemoteException re) {
assertTrue(re.getClassName().contains("ExitException"));
GenericTestUtils.assertExceptionContains(
"Could not sync enough journals to persistent storage due to " +
"No journals available to flush. " +
"Unsynced transactions: 1", re);
}
}
/** Try openning a file for append. */
private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception {
for(int i = 0; i < 10; i++) {
try {
return fs.append(p);
} catch(RemoteException re) {
if (re.getClassName().equals(RecoveryInProgressException.class.getName())) {
AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re);
Thread.sleep(1000);
}
else
throw re;
}
}
throw new IOException("Cannot append to " + p);
}
private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
long memForReduceTasks, String expectedMsg)
throws Exception,
IOException {
String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" };
boolean throwsException = false;
String msg = null;
try {
ToolRunner.run(jobConf, new SleepJob(), args);
} catch (RemoteException re) {
throwsException = true;
msg = re.unwrapRemoteException().getMessage();
}
assertTrue(throwsException);
assertNotNull(msg);
String overallExpectedMsg =
"(" + memForMapTasks + " memForMapTasks " + memForReduceTasks
+ " memForReduceTasks): " + expectedMsg;
assertTrue("Observed message - " + msg
+ " - doesn't contain expected message - " + overallExpectedMsg, msg
.contains(overallExpectedMsg));
}
public void removeAcl(String src) throws IOException {
checkOpen();
TraceScope scope = Trace.startSpan("removeAcl", traceSampler);
try {
namenode.removeAcl(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
} finally {
scope.close();
}
}
private static AvatarProtocol createAvatarnode(AvatarProtocol rpcAvatarnode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5, 5000, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (AvatarProtocol) RetryProxy.create(AvatarProtocol.class,
rpcAvatarnode, methodNameToPolicyMap);
}
/**
* Used to ensure that at least one of the given HA NNs is currently in the
* active state..
*
* @param namenodes list of RPC proxies for each NN to check.
* @return true if at least one NN is active, false if all are in the standby state.
* @throws IOException in the event of error.
*/
public static boolean isAtLeastOneActive(List<ClientProtocol> namenodes)
throws IOException {
for (ClientProtocol namenode : namenodes) {
try {
namenode.getFileInfo("/");
return true;
} catch (RemoteException re) {
IOException cause = re.unwrapRemoteException();
if (cause instanceof StandbyException) {
// This is expected to happen for a standby NN.
} else {
throw re;
}
}
}
return false;
}
/**
* Creates a symbolic link.
*
* @see ClientProtocol#createSymlink(String, String,FsPermission, boolean)
*/
public void createSymlink(String target, String link, boolean createParent)
throws IOException {
TraceScope scope = getPathTraceScope("createSymlink", target);
try {
FsPermission dirPerm =
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
namenode.createSymlink(target, link, dirPerm, createParent);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
} finally {
scope.close();
}
}
/** Method to get stream returned by append call */
private DFSOutputStream callAppend(String src, int buffersize,
EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
throws IOException {
CreateFlag.validateForAppend(flag);
try {
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
new EnumSetWritable<>(flag, CreateFlag.class));
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
progress, blkWithStatus.getLastBlock(),
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
favoredNodes);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
SafeModeException.class,
DSQuotaExceededException.class,
UnsupportedOperationException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
}
}
/** Try openning a file for append. */
private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception {
for(int i = 0; i < 10; i++) {
try {
return fs.append(p);
} catch(RemoteException re) {
if (re.getClassName().equals(RecoveryInProgressException.class.getName())) {
AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re);
Thread.sleep(1000);
}
else
throw re;
}
}
throw new IOException("Cannot append to " + p);
}
/** {@inheritDoc} */
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final UnixUserGroupInformation ugi = getUGI(request);
final PrintWriter out = response.getWriter();
final String filename = getFilename(request, response);
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);
final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
final ClientProtocol nnproxy = DFSClient.createNamenode(conf);
try {
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
filename, nnproxy, socketFactory, socketTimeout);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
new RemoteException(ioe.getClass().getName(), ioe.getMessage()
).writeXml(filename, xml);
}
xml.endDocument();
}
public void setXAttr(String src, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
checkOpen();
TraceScope scope = getPathTraceScope("setXAttr", src);
try {
namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
} finally {
scope.close();
}
}
public void removeXAttr(String src, String name) throws IOException {
checkOpen();
TraceScope scope = getPathTraceScope("removeXAttr", src);
try {
namenode.removeXAttr(src, XAttrHelper.buildXAttr(name));
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
} finally {
scope.close();
}
}
/**
* The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
* of the machine on which the instance is running to the file, but did not
* close the file until it exits.
*
* This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
* This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
* @return null if there is a running instance;
* otherwise, the output stream to the newly created file.
*/
private OutputStream checkAndMarkRunning() throws IOException {
try {
if (fs.exists(idPath)) {
// try appending to it so that it will fail fast if another balancer is
// running.
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
final FSDataOutputStream fsout = fs.create(idPath, false);
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
fsout.hflush();
}
return fsout;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
/**
* Get the data transfer protocol version supported in the cluster
* assuming all the datanodes have the same version.
*
* @return the data transfer protocol version supported in the cluster
*/
int getDataTransferProtocolVersion() throws IOException {
synchronized (dataTransferVersion) {
if (dataTransferVersion == -1) {
// Get the version number from NN
try {
int remoteDataTransferVersion = namenode.getDataTransferProtocolVersion();
updateDataTransferProtocolVersionIfNeeded(remoteDataTransferVersion);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(IOException.class);
if (ioe.getMessage().startsWith(IOException.class.getName() + ": " +
NoSuchMethodException.class.getName())) {
dataTransferVersion = 14; // last version not supportting this RPC
} else {
throw ioe;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Data Transfer Protocal Version is "+ dataTransferVersion);
}
}
return dataTransferVersion;
}
}
/**
* Deleting directory with snapshottable descendant with snapshots must fail.
*/
@Test (timeout=300000)
public void testDeleteDirectoryWithSnapshot2() throws Exception {
Path file0 = new Path(sub, "file0");
Path file1 = new Path(sub, "file1");
DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
Path subfile1 = new Path(subsub, "file0");
Path subfile2 = new Path(subsub, "file1");
DFSTestUtil.createFile(hdfs, subfile1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, subfile2, BLOCKSIZE, REPLICATION, seed);
// Allow snapshot for subsub1, and create snapshot for it
hdfs.allowSnapshot(subsub);
hdfs.createSnapshot(subsub, "s1");
// Deleting dir while its descedant subsub1 having snapshots should fail
exception.expect(RemoteException.class);
String error = subsub.toString()
+ " is snapshottable and already has snapshots";
exception.expectMessage(error);
hdfs.delete(dir, true);
}
public void modifyAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
checkOpen();
TraceScope scope = getPathTraceScope("modifyAclEntries", src);
try {
namenode.modifyAclEntries(src, aclSpec);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
AclException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
} finally {
scope.close();
}
}
@Override
public void reportTo(DatanodeProtocolClientSideTranslatorPB bpNamenode,
DatanodeRegistration bpRegistration) throws BPServiceActorActionException {
if (bpRegistration == null) {
return;
}
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
String[] uuids = { storageUuid };
StorageType[] types = { storageType };
LocatedBlock[] locatedBlock = { new LocatedBlock(block,
dnArr, uuids, types) };
try {
bpNamenode.reportBadBlocks(locatedBlock);
} catch (RemoteException re) {
DataNode.LOG.info("reportBadBlock encountered RemoteException for "
+ "block: " + block , re);
} catch (IOException e) {
throw new BPServiceActorActionException("Failed to report bad block "
+ block + " to namenode: ");
}
}
private void getBlocksWithException(NamenodeProtocol namenode,
DatanodeInfo datanode,
long size) throws IOException {
boolean getException = false;
try {
namenode.getBlocks(new DatanodeInfo(), 2);
} catch(RemoteException e) {
getException = true;
assertTrue(e.getMessage().contains("IllegalArgumentException"));
}
assertTrue(getException);
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final DistributedFileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
((DistributedFileSystem) fs2).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
// 2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
static RetriableException getWrappedRetriableException(Exception e) {
if (!(e instanceof RemoteException)) {
return null;
}
Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
RetriableException.class);
return unwrapped instanceof RetriableException ?
(RetriableException) unwrapped : null;
}
/**
* Get a partial listing of the indicated directory
*
* Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter
* if the application wants to fetch a listing starting from
* the first entry in the directory
*
* @see ClientProtocol#getLocatedPartialListing(String, byte[])
*/
public RemoteIterator<LocatedFileStatus> listPathWithLocation(
final String src) throws IOException {
checkOpen();
try {
if (namenodeProtocolProxy == null) {
return versionBasedListPathWithLocation(src);
}
return methodBasedListPathWithLocation(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
}
public static void transitionToActive(HAServiceProtocol svc,
StateChangeRequestInfo reqInfo)
throws IOException {
try {
svc.transitionToActive(reqInfo);
} catch (RemoteException e) {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
}
/**
* Test truncate over quota does not mark file as UC or create a lease
*/
@Test (timeout=60000)
public void testTruncateOverQuota() throws Exception {
final Path dir = new Path("/TestTruncateOverquota");
final Path file = new Path(dir, "file");
// create partial block file
dfs.mkdirs(dir);
DFSTestUtil.createFile(dfs, file, BLOCKSIZE/2, REPLICATION, seed);
// lower quota to cause exception when appending to partial block
dfs.setQuota(dir, Long.MAX_VALUE - 1, 1);
final INodeDirectory dirNode = fsdir.getINode4Write(dir.toString())
.asDirectory();
final long spaceUsed = dirNode.getDirectoryWithQuotaFeature()
.getSpaceConsumed().getStorageSpace();
try {
dfs.truncate(file, BLOCKSIZE / 2 - 1);
Assert.fail("truncate didn't fail");
} catch (RemoteException e) {
assertTrue(e.getClassName().contains("DSQuotaExceededException"));
}
// check that the file exists, isn't UC, and has no dangling lease
INodeFile inode = fsdir.getINode(file.toString()).asFile();
Assert.assertNotNull(inode);
Assert.assertFalse("should not be UC", inode.isUnderConstruction());
Assert.assertNull("should not have a lease", cluster.getNamesystem()
.getLeaseManager().getLeaseByPath(file.toString()));
// make sure the quota usage is unchanged
final long newSpaceUsed = dirNode.getDirectoryWithQuotaFeature()
.getSpaceConsumed().getStorageSpace();
assertEquals(spaceUsed, newSpaceUsed);
// make sure edits aren't corrupted
dfs.recoverLease(file);
cluster.restartNameNodes();
}
/**
* Create one snapshot.
*
* @param snapshotRoot The directory where the snapshot is to be taken
* @param snapshotName Name of the snapshot
* @return the snapshot path.
* @see ClientProtocol#createSnapshot(String, String)
*/
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
checkOpen();
TraceScope scope = Trace.startSpan("createSnapshot", traceSampler);
try {
return namenode.createSnapshot(snapshotRoot, snapshotName);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
} finally {
scope.close();
}
}
/** {@inheritDoc} */
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
if (RemoteException.class.getSimpleName().equals(qname)) {
throw new SAXException(RemoteException.valueOf(attrs));
}
throw new SAXException("Unrecognized entry: " + qname);
}
filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
}
/**
* Move blocks from src to trg and delete src
* See {@link ClientProtocol#concat}.
*/
public void concat(String trg, String [] srcs) throws IOException {
checkOpen();
TraceScope scope = Trace.startSpan("concat", traceSampler);
try {
namenode.concat(trg, srcs);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
} finally {
scope.close();
}
}
@Test(timeout = 60000)
public void TestSnapshotWithInvalidName1() throws Exception{
Path file1 = new Path(dir1, file1Name);
DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, SEED);
hdfs.allowSnapshot(dir1);
try {
hdfs.createSnapshot(dir1, snapshot2);
} catch (RemoteException e) {
}
}