org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException源码实例Demo

下面列出了org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: NameNodeConnector.java
/**
 * The idea for making sure that there is no more than one instance
 * running in an HDFS is to create a file in the HDFS, writes the hostname
 * of the machine on which the instance is running to the file, but did not
 * close the file until it exits. 
 * 
 * This prevents the second instance from running because it can not
 * creates the file while the first one is running.
 * 
 * This method checks if there is any running instance. If no, mark yes.
 * Note that this is an atomic operation.
 * 
 * @return null if there is a running instance;
 *         otherwise, the output stream to the newly created file.
 */
private OutputStream checkAndMarkRunning() throws IOException {
  try {
    if (fs.exists(idPath)) {
      // try appending to it so that it will fail fast if another balancer is
      // running.
      IOUtils.closeStream(fs.append(idPath));
      fs.delete(idPath, true);
    }
    final FSDataOutputStream fsout = fs.create(idPath, false);
    // mark balancer idPath to be deleted during filesystem closure
    fs.deleteOnExit(idPath);
    if (write2IdFile) {
      fsout.writeBytes(InetAddress.getLocalHost().getHostName());
      fsout.hflush();
    }
    return fsout;
  } catch(RemoteException e) {
    if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
      return null;
    } else {
      throw e;
    }
  }
}
 
源代码2 项目: big-c   文件: NameNodeConnector.java
/**
 * The idea for making sure that there is no more than one instance
 * running in an HDFS is to create a file in the HDFS, writes the hostname
 * of the machine on which the instance is running to the file, but did not
 * close the file until it exits. 
 * 
 * This prevents the second instance from running because it can not
 * creates the file while the first one is running.
 * 
 * This method checks if there is any running instance. If no, mark yes.
 * Note that this is an atomic operation.
 * 
 * @return null if there is a running instance;
 *         otherwise, the output stream to the newly created file.
 */
private OutputStream checkAndMarkRunning() throws IOException {
  try {
    if (fs.exists(idPath)) {
      // try appending to it so that it will fail fast if another balancer is
      // running.
      IOUtils.closeStream(fs.append(idPath));
      fs.delete(idPath, true);
    }
    final FSDataOutputStream fsout = fs.create(idPath, false);
    // mark balancer idPath to be deleted during filesystem closure
    fs.deleteOnExit(idPath);
    if (write2IdFile) {
      fsout.writeBytes(InetAddress.getLocalHost().getHostName());
      fsout.hflush();
    }
    return fsout;
  } catch(RemoteException e) {
    if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
      return null;
    } else {
      throw e;
    }
  }
}
 
源代码3 项目: pravega   文件: HDFSExceptionHelpers.java
/**
 * Translates HDFS specific Exceptions to Pravega-equivalent Exceptions.
 *
 * @param segmentName Name of the stream segment on which the exception occurs.
 * @param e           The exception to be translated.
 * @return  The exception to be thrown.
 */
static <T> StreamSegmentException convertException(String segmentName, Throwable e) {
    if (e instanceof RemoteException) {
        e = ((RemoteException) e).unwrapRemoteException();
    }

    if (e instanceof PathNotFoundException || e instanceof FileNotFoundException) {
        return new StreamSegmentNotExistsException(segmentName, e);
    } else if (e instanceof FileAlreadyExistsException || e instanceof AlreadyBeingCreatedException) {
        return new StreamSegmentExistsException(segmentName, e);
    } else if (e instanceof AclException) {
        return new StreamSegmentSealedException(segmentName, e);
    } else {
        throw Exceptions.sneakyThrow(e);
    }
}
 
源代码4 项目: hbase   文件: HBaseFsck.java
@Override
public FSDataOutputStream call() throws IOException {
  try {
    FileSystem fs = CommonFSUtils.getCurrentFileSystem(this.conf);
    FsPermission defaultPerms =
      CommonFSUtils.getFilePermissions(fs, this.conf, HConstants.DATA_FILE_UMASK_KEY);
    Path tmpDir = getTmpDir(conf);
    this.hbckLockPath = new Path(tmpDir, HBCK_LOCK_FILE);
    fs.mkdirs(tmpDir);
    final FSDataOutputStream out = createFileWithRetries(fs, this.hbckLockPath, defaultPerms);
    out.writeBytes(InetAddress.getLocalHost().toString());
    // Add a note into the file we write on why hbase2 is writing out an hbck1 lock file.
    out.writeBytes(" Written by an hbase-2.x Master to block an " +
        "attempt by an hbase-1.x HBCK tool making modification to state. " +
        "See 'HBCK must match HBase server version' in the hbase refguide.");
    out.flush();
    return out;
  } catch(RemoteException e) {
    if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
      return null;
    } else {
      throw e;
    }
  }
}
 
源代码5 项目: RDFS   文件: DFSClient.java
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
    Configuration conf)
  throws IOException {
  long sleepTime = conf.getLong("dfs.client.rpc.retry.sleep",
      LEASE_SOFTLIMIT_PERIOD);
  RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
      5, sleepTime, TimeUnit.MILLISECONDS);

  Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
    new HashMap<Class<? extends Exception>, RetryPolicy>();
  remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);

  Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
    new HashMap<Class<? extends Exception>, RetryPolicy>();
  exceptionToPolicyMap.put(RemoteException.class,
      RetryPolicies.retryByRemoteException(
          RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
  RetryPolicy methodPolicy = RetryPolicies.retryByException(
      RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
  Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();

  methodNameToPolicyMap.put("create", methodPolicy);

  return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
      rpcNamenode, methodNameToPolicyMap);
}
 
源代码6 项目: jstorm   文件: FileLock.java
/**
 * Takes ownership of the lock file if possible.
 * @param lockFile
 * @param lastEntry   last entry in the lock file. this param is an optimization.
 *                    we dont scan the lock file again to find its last entry here since
 *                    its already been done once by the logic used to check if the lock
 *                    file is stale. so this value comes from that earlier scan.
 * @param spoutId     spout id
 * @throws IOException if unable to acquire
 * @return null if lock File is not recoverable
 */
public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
        throws IOException {
  try {
    if(fs instanceof DistributedFileSystem ) {
      if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) {
        LOG.warn("Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}", lockFile, spoutId);
        return null;
      }
    }
    return new FileLock(fs, lockFile, spoutId, lastEntry);
  } catch (IOException e) {
    if (e instanceof RemoteException &&
            ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
      LOG.warn("Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, e);
      return null;
    } else { // unexpected error
      LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e);
      throw e;
    }
  }
}
 
源代码7 项目: DataLink   文件: RemoteUtil.java
public static void tryRemoteClose(String hdfsFilePath, Exception e) {
    try {
        if (e instanceof RemoteException) {
            RemoteException re = (RemoteException) e;
            String className = re.getClassName();
            if (className.equals(AlreadyBeingCreatedException.class.getName()) && e.getMessage().contains(RECREATE_IDENTIFIER)) {
                logger.info("stream remote close begin for file : " + hdfsFilePath);
                colseInternal(hdfsFilePath, parseIp(e.getMessage()));
                logger.info("stream remote close end for file : " + hdfsFilePath);
            }
        }
    } catch (Exception ex) {
        logger.error("stream remote close failed for file : " + hdfsFilePath, ex);
    }
}
 
@Override
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions)
    throws AccessControlException, AlreadyBeingCreatedException,
    DSQuotaExceededException, FileAlreadyExistsException,
    FileNotFoundException, NSQuotaExceededException,
    ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
    IOException {
  CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
      .setSrc(src)
      .setMasked(PBHelper.convert(masked))
      .setClientName(clientName)
      .setCreateFlag(PBHelper.convertCreateFlag(flag))
      .setCreateParent(createParent)
      .setReplication(replication)
      .setBlockSize(blockSize);
  builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
  CreateRequestProto req = builder.build();
  try {
    CreateResponseProto res = rpcProxy.create(null, req);
    return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }

}
 
源代码9 项目: hadoop   文件: TestFileAppend.java
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppendTwice() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  final FileSystem fs1 = cluster.getFileSystem();
  final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
  try {

    final Path p = new Path("/testAppendTwice/foo");
    final int len = 1 << 16;
    final byte[] fileContents = AppendTestUtil.initBuffer(len);

    {
      // create a new file with a full block.
      FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
      out.write(fileContents, 0, len);
      out.close();
    }

    //1st append does not add any data so that the last block remains full
    //and the last block in INodeFileUnderConstruction is a BlockInfo
    //but not BlockInfoUnderConstruction. 
    fs2.append(p);
    
    //2nd append should get AlreadyBeingCreatedException
    fs1.append(p);
    Assert.fail();
  } catch(RemoteException re) {
    AppendTestUtil.LOG.info("Got an exception:", re);
    Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
        re.getClassName());
  } finally {
    fs2.close();
    fs1.close();
    cluster.shutdown();
  }
}
 
源代码10 项目: hadoop   文件: TestFileAppend.java
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  final DistributedFileSystem fs1 = cluster.getFileSystem();
  final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
  try {
    final Path p = new Path("/testAppendTwice/foo");
    final int len = 1 << 16;
    final byte[] fileContents = AppendTestUtil.initBuffer(len);

    {
      // create a new file with a full block.
      FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
      out.write(fileContents, 0, len);
      out.close();
    }

    //1st append does not add any data so that the last block remains full
    //and the last block in INodeFileUnderConstruction is a BlockInfo
    //but not BlockInfoUnderConstruction.
    ((DistributedFileSystem) fs2).append(p,
        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);

    // 2nd append should get AlreadyBeingCreatedException
    fs1.append(p);
    Assert.fail();
  } catch(RemoteException re) {
    AppendTestUtil.LOG.info("Got an exception:", re);
    Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
        re.getClassName());
  } finally {
    fs2.close();
    fs1.close();
    cluster.shutdown();
  }
}
 
源代码11 项目: big-c   文件: ClientNamenodeProtocolTranslatorPB.java
@Override
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions)
    throws AccessControlException, AlreadyBeingCreatedException,
    DSQuotaExceededException, FileAlreadyExistsException,
    FileNotFoundException, NSQuotaExceededException,
    ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
    IOException {
  CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
      .setSrc(src)
      .setMasked(PBHelper.convert(masked))
      .setClientName(clientName)
      .setCreateFlag(PBHelper.convertCreateFlag(flag))
      .setCreateParent(createParent)
      .setReplication(replication)
      .setBlockSize(blockSize);
  builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
  CreateRequestProto req = builder.build();
  try {
    CreateResponseProto res = rpcProxy.create(null, req);
    return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }

}
 
源代码12 项目: big-c   文件: TestFileAppend.java
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppendTwice() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  final FileSystem fs1 = cluster.getFileSystem();
  final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
  try {

    final Path p = new Path("/testAppendTwice/foo");
    final int len = 1 << 16;
    final byte[] fileContents = AppendTestUtil.initBuffer(len);

    {
      // create a new file with a full block.
      FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
      out.write(fileContents, 0, len);
      out.close();
    }

    //1st append does not add any data so that the last block remains full
    //and the last block in INodeFileUnderConstruction is a BlockInfo
    //but not BlockInfoUnderConstruction. 
    fs2.append(p);
    
    //2nd append should get AlreadyBeingCreatedException
    fs1.append(p);
    Assert.fail();
  } catch(RemoteException re) {
    AppendTestUtil.LOG.info("Got an exception:", re);
    Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
        re.getClassName());
  } finally {
    fs2.close();
    fs1.close();
    cluster.shutdown();
  }
}
 
源代码13 项目: big-c   文件: TestFileAppend.java
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  final DistributedFileSystem fs1 = cluster.getFileSystem();
  final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
  try {
    final Path p = new Path("/testAppendTwice/foo");
    final int len = 1 << 16;
    final byte[] fileContents = AppendTestUtil.initBuffer(len);

    {
      // create a new file with a full block.
      FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
      out.write(fileContents, 0, len);
      out.close();
    }

    //1st append does not add any data so that the last block remains full
    //and the last block in INodeFileUnderConstruction is a BlockInfo
    //but not BlockInfoUnderConstruction.
    ((DistributedFileSystem) fs2).append(p,
        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);

    // 2nd append should get AlreadyBeingCreatedException
    fs1.append(p);
    Assert.fail();
  } catch(RemoteException re) {
    AppendTestUtil.LOG.info("Got an exception:", re);
    Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
        re.getClassName());
  } finally {
    fs2.close();
    fs1.close();
    cluster.shutdown();
  }
}
 
源代码14 项目: beam   文件: HDFSSynchronizationTest.java
@Test
public void testCatchingRemoteException() throws IOException {
  FileSystem mockedFileSystem = Mockito.mock(FileSystem.class);
  RemoteException thrownException =
      new RemoteException(AlreadyBeingCreatedException.class.getName(), "Failed to CREATE_FILE");
  Mockito.when(mockedFileSystem.createNewFile(Mockito.any())).thenThrow(thrownException);

  HDFSSynchronization synchronization =
      new HDFSSynchronization("someDir", (conf) -> mockedFileSystem);

  assertFalse(synchronization.tryAcquireJobLock(configuration));
}
 
源代码15 项目: hbase   文件: TestOverwriteFileUnderConstruction.java
@Test
public void testNotOverwrite() throws IOException {
  Path file = new Path("/" + name.getMethodName());
  try (FSDataOutputStream out1 = FS.create(file)) {
    try {
      FS.create(file, false);
      fail("Should fail as there is a file with the same name which is being written");
    } catch (RemoteException e) {
      // expected
      assertThat(e.unwrapRemoteException(), instanceOf(AlreadyBeingCreatedException.class));
    }
  }
}
 
源代码16 项目: RDFS   文件: TestLeaseRecovery3.java
private void recoverLeaseUsingCreate(Path filepath) throws IOException {
  Configuration conf2 = new Configuration(conf);
  String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
  UnixUserGroupInformation.saveToConf(conf2,
      UnixUserGroupInformation.UGI_PROPERTY_NAME,
      new UnixUserGroupInformation(username, new String[]{"supergroup"}));
  FileSystem dfs2 = FileSystem.get(conf2);

  boolean done = false;
  for(int i = 0; i < 10 && !done; i++) {
    AppendTestUtil.LOG.info("i=" + i);
    try {
      dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
      fail("Creation of an existing file should never succeed.");
    } catch (IOException ioe) {
      final String message = ioe.getMessage();
      if (message.contains("file exists")) {
        AppendTestUtil.LOG.info("done", ioe);
        done = true;
      }
      else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
        AppendTestUtil.LOG.info("GOOD! got " + message);
      }
      else {
        AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
      }
    }

    if (!done) {
      AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
      try {Thread.sleep(5000);} catch (InterruptedException e) {}
    }
  }
  assertTrue(done);

}
 
源代码17 项目: RDFS   文件: TestLeaseRecovery2.java
private void recoverLeaseUsingCreate(Path filepath) throws IOException {
  Configuration conf2 = new Configuration(conf);
  String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
  UnixUserGroupInformation.saveToConf(conf2,
      UnixUserGroupInformation.UGI_PROPERTY_NAME,
      new UnixUserGroupInformation(username, new String[]{"supergroup"}));
  FileSystem dfs2 = FileSystem.get(conf2);

  boolean done = false;
  for(int i = 0; i < 10 && !done; i++) {
    AppendTestUtil.LOG.info("i=" + i);
    try {
      dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
      fail("Creation of an existing file should never succeed.");
    } catch (IOException ioe) {
      final String message = ioe.getMessage();
      if (message.contains("file exists")) {
        AppendTestUtil.LOG.info("done", ioe);
        done = true;
      }
      else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
        AppendTestUtil.LOG.info("GOOD! got " + message);
      }
      else {
        AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
      }
    }

    if (!done) {
      AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
      try {Thread.sleep(5000);} catch (InterruptedException e) {}
    }
  }
  assertTrue(done);

}
 
源代码18 项目: RDFS   文件: Balancer.java
private OutputStream checkAndMarkRunningBalancer() throws IOException {
  try {
    DataOutputStream out = fs.create(BALANCER_ID_PATH);
    out. writeBytes(InetAddress.getLocalHost().getHostName());
    out.flush();
    return out;
  } catch(RemoteException e) {
    if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
      return null;
    } else {
      throw e;
    }
  }
}
 
源代码19 项目: hudi   文件: HoodieLogFormatWriter.java
private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
    throws IOException, InterruptedException {
  if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
    // This issue happens when all replicas for a file are down and/or being decommissioned.
    // The fs.append() API could append to the last block for a file. If the last block is full, a new block is
    // appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all
    // replicas for a block/file are decommissioned together. During this process, all these blocks will start to
    // get replicated to other active DataNodes but this process might take time (can be of the order of few
    // hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
    // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
    // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
    LOG.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
    // Rollover the current log file (since cannot get a stream handle) and create new one
    this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
    createNewFile();
  } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
    LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
    // Rollover the current log file (since cannot get a stream handle) and create new one
    this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
    createNewFile();
  } else if (e.getClassName().contentEquals(RecoveryInProgressException.class.getName())
      && (fs instanceof DistributedFileSystem)) {
    // this happens when either another task executor writing to this file died or
    // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
    // ViewFileSystem unfortunately does not support this operation
    LOG.warn("Trying to recover log on path " + path);
    if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
      LOG.warn("Recovered lease on path " + path);
      // try again
      this.output = fs.append(path, bufferSize);
    } else {
      LOG.warn("Failed to recover lease on path " + path);
      throw new HoodieException(e);
    }
  } else {
    // When fs.append() has failed and an exception is thrown, by closing the output stream
    // we shall force hdfs to release the lease on the log file. When Spark retries this task (with
    // new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was
    // closed properly by taskId.0).
    //
    // If close() call were to fail throwing an exception, our best bet is to rollover to a new log file.
    try {
      close();
      // output stream has been successfully closed and lease on the log file has been released,
      // before throwing an exception for the append failure.
      throw new HoodieIOException("Failed to append to the output stream ", e);
    } catch (Exception ce) {
      LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path
          + ". Rolling over to a new log file.");
      this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
      createNewFile();
    }
  }
}
 
源代码20 项目: RDFS   文件: DistributedAvatarFileSystem.java
@Override
public void create(final String src, final FsPermission masked,
    final String clientName, final boolean overwrite,
    final boolean createParent,
    final short replication, final long blockSize) throws IOException {
  (new MutableFSCaller<Boolean>() {
    @Override
    Boolean call(int retries) throws IOException {
      if (retries > 0) {
        // This I am not sure about, because of lease holder I can tell if
        // it
        // was me or not with a high level of certainty
        FileStatus stat = namenode.getFileInfo(src);
        if (stat != null) {
          /*
           * Since the file exists already we need to perform a number of
           * checks to see if it was created by us before the failover
           */

          if (stat.getBlockSize() == blockSize
              && stat.getReplication() == replication && stat.getLen() == 0
              && stat.getPermission().equals(masked)) {
            // The file has not been written to and it looks exactly like
            // the file we were trying to create. Last check:
            // call create again and then parse the exception.
            // Two cases:
            // it was the same client who created the old file
            // or it was created by someone else - fail
            try {
              namenode.create(src, masked, clientName, overwrite,
                  createParent, replication, blockSize);
            } catch (RemoteException re) {
              if (re.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
                AlreadyBeingCreatedException aex = (AlreadyBeingCreatedException) re
                    .unwrapRemoteException();
                if (aex.getMessage().contains(
                    "current leaseholder is trying to recreate file")) {
                  namenode.delete(src, false);
                } else {
                  throw re;
                }
              } else {
                throw re;
              }
            }
          }
        }
      }
      namenode.create(src, masked, clientName, overwrite, createParent, replication,
          blockSize);
      return true;
    }
  }).callFS();
}