下面列出了org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
* of the machine on which the instance is running to the file, but did not
* close the file until it exits.
*
* This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
* This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
* @return null if there is a running instance;
* otherwise, the output stream to the newly created file.
*/
private OutputStream checkAndMarkRunning() throws IOException {
try {
if (fs.exists(idPath)) {
// try appending to it so that it will fail fast if another balancer is
// running.
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
final FSDataOutputStream fsout = fs.create(idPath, false);
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
fsout.hflush();
}
return fsout;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
/**
* The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
* of the machine on which the instance is running to the file, but did not
* close the file until it exits.
*
* This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
* This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
* @return null if there is a running instance;
* otherwise, the output stream to the newly created file.
*/
private OutputStream checkAndMarkRunning() throws IOException {
try {
if (fs.exists(idPath)) {
// try appending to it so that it will fail fast if another balancer is
// running.
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
final FSDataOutputStream fsout = fs.create(idPath, false);
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
fsout.hflush();
}
return fsout;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
/**
* Translates HDFS specific Exceptions to Pravega-equivalent Exceptions.
*
* @param segmentName Name of the stream segment on which the exception occurs.
* @param e The exception to be translated.
* @return The exception to be thrown.
*/
static <T> StreamSegmentException convertException(String segmentName, Throwable e) {
if (e instanceof RemoteException) {
e = ((RemoteException) e).unwrapRemoteException();
}
if (e instanceof PathNotFoundException || e instanceof FileNotFoundException) {
return new StreamSegmentNotExistsException(segmentName, e);
} else if (e instanceof FileAlreadyExistsException || e instanceof AlreadyBeingCreatedException) {
return new StreamSegmentExistsException(segmentName, e);
} else if (e instanceof AclException) {
return new StreamSegmentSealedException(segmentName, e);
} else {
throw Exceptions.sneakyThrow(e);
}
}
@Override
public FSDataOutputStream call() throws IOException {
try {
FileSystem fs = CommonFSUtils.getCurrentFileSystem(this.conf);
FsPermission defaultPerms =
CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY);
Path tmpDir = getTmpDir(conf);
this.hbckLockPath = new Path(tmpDir, HBCK_LOCK_FILE);
fs.mkdirs(tmpDir);
final FSDataOutputStream out = createFileWithRetries(fs, this.hbckLockPath, defaultPerms);
out.writeBytes(InetAddress.getLocalHost().toString());
// Add a note into the file we write on why hbase2 is writing out an hbck1 lock file.
out.writeBytes(" Written by an hbase-2.x Master to block an " +
"attempt by an hbase-1.x HBCK tool making modification to state. " +
"See 'HBCK must match HBase server version' in the hbase refguide.");
out.flush();
return out;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
Configuration conf)
throws IOException {
long sleepTime = conf.getLong("dfs.client.rpc.retry.sleep",
LEASE_SOFTLIMIT_PERIOD);
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, sleepTime, TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
rpcNamenode, methodNameToPolicyMap);
}
/**
* Takes ownership of the lock file if possible.
* @param lockFile
* @param lastEntry last entry in the lock file. this param is an optimization.
* we dont scan the lock file again to find its last entry here since
* its already been done once by the logic used to check if the lock
* file is stale. so this value comes from that earlier scan.
* @param spoutId spout id
* @throws IOException if unable to acquire
* @return null if lock File is not recoverable
*/
public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
throws IOException {
try {
if(fs instanceof DistributedFileSystem ) {
if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) {
LOG.warn("Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}", lockFile, spoutId);
return null;
}
}
return new FileLock(fs, lockFile, spoutId, lastEntry);
} catch (IOException e) {
if (e instanceof RemoteException &&
((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
LOG.warn("Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, e);
return null;
} else { // unexpected error
LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e);
throw e;
}
}
}
public static void tryRemoteClose(String hdfsFilePath, Exception e) {
try {
if (e instanceof RemoteException) {
RemoteException re = (RemoteException) e;
String className = re.getClassName();
if (className.equals(AlreadyBeingCreatedException.class.getName()) && e.getMessage().contains(RECREATE_IDENTIFIER)) {
logger.info("stream remote close begin for file : " + hdfsFilePath);
colseInternal(hdfsFilePath, parseIp(e.getMessage()));
logger.info("stream remote close end for file : " + hdfsFilePath);
}
}
} catch (Exception ex) {
logger.error("stream remote close failed for file : " + hdfsFilePath, ex);
}
}
@Override
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
.setSrc(src)
.setMasked(PBHelper.convert(masked))
.setClientName(clientName)
.setCreateFlag(PBHelper.convertCreateFlag(flag))
.setCreateParent(createParent)
.setReplication(replication)
.setBlockSize(blockSize);
builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
CreateRequestProto req = builder.build();
try {
CreateResponseProto res = rpcProxy.create(null, req);
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppendTwice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
fs2.append(p);
//2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final DistributedFileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
((DistributedFileSystem) fs2).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
// 2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
@Override
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
.setSrc(src)
.setMasked(PBHelper.convert(masked))
.setClientName(clientName)
.setCreateFlag(PBHelper.convertCreateFlag(flag))
.setCreateParent(createParent)
.setReplication(replication)
.setBlockSize(blockSize);
builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
CreateRequestProto req = builder.build();
try {
CreateResponseProto res = rpcProxy.create(null, req);
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppendTwice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
fs2.append(p);
//2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final DistributedFileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
((DistributedFileSystem) fs2).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
// 2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
@Test
public void testCatchingRemoteException() throws IOException {
FileSystem mockedFileSystem = Mockito.mock(FileSystem.class);
RemoteException thrownException =
new RemoteException(AlreadyBeingCreatedException.class.getName(), "Failed to CREATE_FILE");
Mockito.when(mockedFileSystem.createNewFile(Mockito.any())).thenThrow(thrownException);
HDFSSynchronization synchronization =
new HDFSSynchronization("someDir", (conf) -> mockedFileSystem);
assertFalse(synchronization.tryAcquireJobLock(configuration));
}
@Test
public void testNotOverwrite() throws IOException {
Path file = new Path("/" + name.getMethodName());
try (FSDataOutputStream out1 = FS.create(file)) {
try {
FS.create(file, false);
fail("Should fail as there is a file with the same name which is being written");
} catch (RemoteException e) {
// expected
assertThat(e.unwrapRemoteException(), instanceOf(AlreadyBeingCreatedException.class));
}
}
}
private void recoverLeaseUsingCreate(Path filepath) throws IOException {
Configuration conf2 = new Configuration(conf);
String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
UnixUserGroupInformation.saveToConf(conf2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
FileSystem dfs2 = FileSystem.get(conf2);
boolean done = false;
for(int i = 0; i < 10 && !done; i++) {
AppendTestUtil.LOG.info("i=" + i);
try {
dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
fail("Creation of an existing file should never succeed.");
} catch (IOException ioe) {
final String message = ioe.getMessage();
if (message.contains("file exists")) {
AppendTestUtil.LOG.info("done", ioe);
done = true;
}
else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
AppendTestUtil.LOG.info("GOOD! got " + message);
}
else {
AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
}
if (!done) {
AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
try {Thread.sleep(5000);} catch (InterruptedException e) {}
}
}
assertTrue(done);
}
private void recoverLeaseUsingCreate(Path filepath) throws IOException {
Configuration conf2 = new Configuration(conf);
String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
UnixUserGroupInformation.saveToConf(conf2,
UnixUserGroupInformation.UGI_PROPERTY_NAME,
new UnixUserGroupInformation(username, new String[]{"supergroup"}));
FileSystem dfs2 = FileSystem.get(conf2);
boolean done = false;
for(int i = 0; i < 10 && !done; i++) {
AppendTestUtil.LOG.info("i=" + i);
try {
dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
fail("Creation of an existing file should never succeed.");
} catch (IOException ioe) {
final String message = ioe.getMessage();
if (message.contains("file exists")) {
AppendTestUtil.LOG.info("done", ioe);
done = true;
}
else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
AppendTestUtil.LOG.info("GOOD! got " + message);
}
else {
AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
}
if (!done) {
AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
try {Thread.sleep(5000);} catch (InterruptedException e) {}
}
}
assertTrue(done);
}
private OutputStream checkAndMarkRunningBalancer() throws IOException {
try {
DataOutputStream out = fs.create(BALANCER_ID_PATH);
out. writeBytes(InetAddress.getLocalHost().getHostName());
out.flush();
return out;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
throws IOException, InterruptedException {
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
// This issue happens when all replicas for a file are down and/or being decommissioned.
// The fs.append() API could append to the last block for a file. If the last block is full, a new block is
// appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all
// replicas for a block/file are decommissioned together. During this process, all these blocks will start to
// get replicated to other active DataNodes but this process might take time (can be of the order of few
// hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
// appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
// Rollover the current log file (since cannot get a stream handle) and create new one
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
createNewFile();
} else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
// Rollover the current log file (since cannot get a stream handle) and create new one
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
createNewFile();
} else if (e.getClassName().contentEquals(RecoveryInProgressException.class.getName())
&& (fs instanceof DistributedFileSystem)) {
// this happens when either another task executor writing to this file died or
// data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
// ViewFileSystem unfortunately does not support this operation
LOG.warn("Trying to recover log on path " + path);
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
LOG.warn("Recovered lease on path " + path);
// try again
this.output = fs.append(path, bufferSize);
} else {
LOG.warn("Failed to recover lease on path " + path);
throw new HoodieException(e);
}
} else {
// When fs.append() has failed and an exception is thrown, by closing the output stream
// we shall force hdfs to release the lease on the log file. When Spark retries this task (with
// new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was
// closed properly by taskId.0).
//
// If close() call were to fail throwing an exception, our best bet is to rollover to a new log file.
try {
close();
// output stream has been successfully closed and lease on the log file has been released,
// before throwing an exception for the append failure.
throw new HoodieIOException("Failed to append to the output stream ", e);
} catch (Exception ce) {
LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path
+ ". Rolling over to a new log file.");
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
createNewFile();
}
}
}
@Override
public void create(final String src, final FsPermission masked,
final String clientName, final boolean overwrite,
final boolean createParent,
final short replication, final long blockSize) throws IOException {
(new MutableFSCaller<Boolean>() {
@Override
Boolean call(int retries) throws IOException {
if (retries > 0) {
// This I am not sure about, because of lease holder I can tell if
// it
// was me or not with a high level of certainty
FileStatus stat = namenode.getFileInfo(src);
if (stat != null) {
/*
* Since the file exists already we need to perform a number of
* checks to see if it was created by us before the failover
*/
if (stat.getBlockSize() == blockSize
&& stat.getReplication() == replication && stat.getLen() == 0
&& stat.getPermission().equals(masked)) {
// The file has not been written to and it looks exactly like
// the file we were trying to create. Last check:
// call create again and then parse the exception.
// Two cases:
// it was the same client who created the old file
// or it was created by someone else - fail
try {
namenode.create(src, masked, clientName, overwrite,
createParent, replication, blockSize);
} catch (RemoteException re) {
if (re.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
AlreadyBeingCreatedException aex = (AlreadyBeingCreatedException) re
.unwrapRemoteException();
if (aex.getMessage().contains(
"current leaseholder is trying to recreate file")) {
namenode.delete(src, false);
} else {
throw re;
}
} else {
throw re;
}
}
}
}
}
namenode.create(src, masked, clientName, overwrite, createParent, replication,
blockSize);
return true;
}
}).callFS();
}