类org.apache.hadoop.util.ThreadUtil源码实例Demo

下面列出了怎么用org.apache.hadoop.util.ThreadUtil的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: circus-train   文件: RetriableCommand.java
/**
 * The execute() method invokes doExecute() until either: 1. doExecute() succeeds, or 2. the command may no longer be
 * retried (e.g. runs out of retry-attempts).
 *
 * @param arguments The list of arguments for the command.
 * @return Generic "Object" from doExecute(), on success.
 * @throws IOException, IOException, on complete failure.
 */
public T execute(Object... arguments) throws Exception {
  Exception latestException;
  int counter = 0;
  while (true) {
    try {
      return doExecute(arguments);
    } catch (Exception exception) {
      LOG.error("Failure in Retriable command: {}", description, exception);
      latestException = exception;
    }
    counter++;
    RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
    if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
    } else {
      break;
    }
  }

  throw new IOException("Couldn't run retriable-command: " + description, latestException);
}
 
源代码2 项目: hadoop   文件: RetriableCommand.java
/**
 * The execute() method invokes doExecute() until either:
 *  1. doExecute() succeeds, or
 *  2. the command may no longer be retried (e.g. runs out of retry-attempts).
 * @param arguments The list of arguments for the command.
 * @return Generic "Object" from doExecute(), on success.
 * @throws Exception
 */
public Object execute(Object... arguments) throws Exception {
  Exception latestException;
  int counter = 0;
  while (true) {
    try {
      return doExecute(arguments);
    } catch(Exception exception) {
      LOG.error("Failure in Retriable command: " + description, exception);
      latestException = exception;
    }
    counter++;
    RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
    if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
    } else {
      break;
    }
  }

  throw new IOException("Couldn't run retriable-command: " + description,
                        latestException);
}
 
源代码3 项目: big-c   文件: RetriableCommand.java
/**
 * The execute() method invokes doExecute() until either:
 *  1. doExecute() succeeds, or
 *  2. the command may no longer be retried (e.g. runs out of retry-attempts).
 * @param arguments The list of arguments for the command.
 * @return Generic "Object" from doExecute(), on success.
 * @throws Exception
 */
public Object execute(Object... arguments) throws Exception {
  Exception latestException;
  int counter = 0;
  while (true) {
    try {
      return doExecute(arguments);
    } catch(Exception exception) {
      LOG.error("Failure in Retriable command: " + description, exception);
      latestException = exception;
    }
    counter++;
    RetryAction action = retryPolicy.shouldRetry(latestException, counter, 0, true);
    if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
    } else {
      break;
    }
  }

  throw new IOException("Couldn't run retriable-command: " + description,
                        latestException);
}
 
源代码4 项目: hadoop-ozone   文件: VersionInfo.java
public VersionInfo(String component) {
  String versionInfoFile = component + "-version-info.properties";
  InputStream is = null;
  try {
    is = ThreadUtil.getResourceAsStream(
      getClass().getClassLoader(),
      versionInfoFile);
    info.load(is);
  } catch (IOException ex) {
    LoggerFactory.getLogger(getClass()).warn("Could not read '" +
        versionInfoFile + "', " + ex.toString(), ex);
  } finally {
    IOUtils.closeStream(is);
  }
}
 
源代码5 项目: hadoop-ozone   文件: RetriableTask.java
@Override
public V call() throws Exception {
  int attempts = 0;
  Exception cause;
  while (true) {
    try {
      return task.call();
    } catch (Exception e) {
      cause = e;
      RetryPolicy.RetryAction action = retryPolicy.shouldRetry(e, ++attempts,
           0, true);
      if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
        LOG.info("Execution of task {} failed, will be retried in {} ms",
            name, action.delayMillis);
        ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
      } else {
        break;
      }
    }
  }

  String msg = String.format(
      "Execution of task %s failed permanently after %d attempts",
      name, attempts);
  LOG.warn(msg, cause);
  throw new IOException(msg, cause);
}
 
源代码6 项目: hadoop   文件: TestModTime.java
/**
 * Regression test for HDFS-3864 - NN does not update internal file mtime for
 * OP_CLOSE when reading from the edit log.
 */
@Test
public void testModTimePersistsAfterRestart() throws IOException {
  final long sleepTime = 10; // 10 milliseconds
  MiniDFSCluster cluster = null;
  FileSystem fs = null;
  Configuration conf = new HdfsConfiguration();
  try {
    cluster = new MiniDFSCluster.Builder(conf).build();
    fs = cluster.getFileSystem();
    Path testPath = new Path("/test");
    
    // Open a file, and get its initial modification time.
    OutputStream out = fs.create(testPath);
    long initialModTime = fs.getFileStatus(testPath).getModificationTime();
    assertTrue(initialModTime > 0);
    
    // Wait and then close the file. Ensure that the mod time goes up.
    ThreadUtil.sleepAtLeastIgnoreInterrupts(sleepTime);
    out.close();
    long modTimeAfterClose = fs.getFileStatus(testPath).getModificationTime();
    assertTrue(modTimeAfterClose >= initialModTime + sleepTime);
    
    // Restart the NN, and make sure that the later mod time is still used.
    cluster.restartNameNode();
    long modTimeAfterRestart = fs.getFileStatus(testPath).getModificationTime();
    assertEquals(modTimeAfterClose, modTimeAfterRestart);
  } finally {
    if (fs != null) {
      fs.close();
    }
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码7 项目: hadoop   文件: TestFailoverProxy.java
/**
 * Ensure that when all configured services are throwing StandbyException
 * that we fail over back and forth between them until one is no longer
 * throwing StandbyException.
 */
@Test
public void testFailoverBetweenMultipleStandbys()
    throws UnreliableException, StandbyException, IOException {
  
  final long millisToSleep = 10000;
  
  final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
      TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
  FlipFlopProxyProvider<UnreliableInterface> proxyProvider
      = new FlipFlopProxyProvider<UnreliableInterface>(
      UnreliableInterface.class,
      impl1,
      new UnreliableImplementation("impl2",
          TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
  
  final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
    .create(UnreliableInterface.class, proxyProvider,
        RetryPolicies.failoverOnNetworkException(
            RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
  
  new Thread() {
    @Override
    public void run() {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep);
      impl1.setIdentifier("renamed-impl1");
    }
  }.start();
  
  String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1");
  assertEquals("renamed-impl1", result);
}
 
源代码8 项目: big-c   文件: TestModTime.java
/**
 * Regression test for HDFS-3864 - NN does not update internal file mtime for
 * OP_CLOSE when reading from the edit log.
 */
@Test
public void testModTimePersistsAfterRestart() throws IOException {
  final long sleepTime = 10; // 10 milliseconds
  MiniDFSCluster cluster = null;
  FileSystem fs = null;
  Configuration conf = new HdfsConfiguration();
  try {
    cluster = new MiniDFSCluster.Builder(conf).build();
    fs = cluster.getFileSystem();
    Path testPath = new Path("/test");
    
    // Open a file, and get its initial modification time.
    OutputStream out = fs.create(testPath);
    long initialModTime = fs.getFileStatus(testPath).getModificationTime();
    assertTrue(initialModTime > 0);
    
    // Wait and then close the file. Ensure that the mod time goes up.
    ThreadUtil.sleepAtLeastIgnoreInterrupts(sleepTime);
    out.close();
    long modTimeAfterClose = fs.getFileStatus(testPath).getModificationTime();
    assertTrue(modTimeAfterClose >= initialModTime + sleepTime);
    
    // Restart the NN, and make sure that the later mod time is still used.
    cluster.restartNameNode();
    long modTimeAfterRestart = fs.getFileStatus(testPath).getModificationTime();
    assertEquals(modTimeAfterClose, modTimeAfterRestart);
  } finally {
    if (fs != null) {
      fs.close();
    }
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码9 项目: big-c   文件: TestFailoverProxy.java
/**
 * Ensure that when all configured services are throwing StandbyException
 * that we fail over back and forth between them until one is no longer
 * throwing StandbyException.
 */
@Test
public void testFailoverBetweenMultipleStandbys()
    throws UnreliableException, StandbyException, IOException {
  
  final long millisToSleep = 10000;
  
  final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
      TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
  FlipFlopProxyProvider<UnreliableInterface> proxyProvider
      = new FlipFlopProxyProvider<UnreliableInterface>(
      UnreliableInterface.class,
      impl1,
      new UnreliableImplementation("impl2",
          TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
  
  final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
    .create(UnreliableInterface.class, proxyProvider,
        RetryPolicies.failoverOnNetworkException(
            RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
  
  new Thread() {
    @Override
    public void run() {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep);
      impl1.setIdentifier("renamed-impl1");
    }
  }.start();
  
  String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1");
  assertEquals("renamed-impl1", result);
}
 
源代码10 项目: hadoop   文件: TestDFSClientExcludedNodes.java
@Test(timeout=60000)
public void testExcludedNodesForgiveness() throws IOException {
  // Forgive nodes in under 2.5s for this test case.
  conf.setLong(
      DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
      2500);
  // We'll be using a 512 bytes block size just for tests
  // so making sure the checksum bytes too match it.
  conf.setInt("io.bytes.per.checksum", 512);
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
  List<DataNodeProperties> props = cluster.dataNodes;
  FileSystem fs = cluster.getFileSystem();
  Path filePath = new Path("/testForgivingExcludedNodes");

  // 256 bytes data chunk for writes
  byte[] bytes = new byte[256];
  for (int index=0; index<bytes.length; index++) {
    bytes[index] = '0';
  }

  // File with a 512 bytes block size
  FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);

  // Write a block to all 3 DNs (2x256bytes).
  out.write(bytes);
  out.write(bytes);
  out.hflush();

  // Remove two DNs, to put them into the exclude list.
  DataNodeProperties two = cluster.stopDataNode(2);
  DataNodeProperties one = cluster.stopDataNode(1);

  // Write another block.
  // At this point, we have two nodes already in excluded list.
  out.write(bytes);
  out.write(bytes);
  out.hflush();

  // Bring back the older DNs, since they are gonna be forgiven only
  // afterwards of this previous block write.
  Assert.assertEquals(true, cluster.restartDataNode(one, true));
  Assert.assertEquals(true, cluster.restartDataNode(two, true));
  cluster.waitActive();

  // Sleep for 5s, to let the excluded nodes be expired
  // from the excludes list (i.e. forgiven after the configured wait period).
  // [Sleeping just in case the restart of the DNs completed < 5s cause
  // otherwise, we'll end up quickly excluding those again.]
  ThreadUtil.sleepAtLeastIgnoreInterrupts(5000);

  // Terminate the last good DN, to assert that there's no
  // single-DN-available scenario, caused by not forgiving the other
  // two by now.
  cluster.stopDataNode(0);

  try {
    // Attempt writing another block, which should still pass
    // cause the previous two should have been forgiven by now,
    // while the last good DN added to excludes this time.
    out.write(bytes);
    out.hflush();
    out.close();
  } catch (Exception e) {
    fail("Excluded DataNodes should be forgiven after a while and " +
         "not cause file writing exception of: '" + e.getMessage() + "'");
  }
}
 
源代码11 项目: hadoop   文件: TestStandbyCheckpoints.java
/**
 * Make sure that clients will receive StandbyExceptions even when a
 * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
 * thread will have FSNS lock. Regression test for HDFS-4591.
 */
@Test(timeout=300000)
public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
  
  // Set it up so that we know when the SBN checkpoint starts and ends.
  FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
  DelayAnswer answerer = new DelayAnswer(LOG);
  Mockito.doAnswer(answerer).when(spyImage1)
      .saveNamespace(Mockito.any(FSNamesystem.class),
          Mockito.eq(NameNodeFile.IMAGE), Mockito.any(Canceler.class));

  // Perform some edits and wait for a checkpoint to start on the SBN.
  doEdits(0, 1000);
  nn0.getRpcServer().rollEditLog();
  answerer.waitForCall();
  assertTrue("SBN is not performing checkpoint but it should be.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  
  // Make sure that the lock has actually been taken by the checkpointing
  // thread.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  try {
    // Perform an RPC to the SBN and make sure it throws a StandbyException.
    nn1.getRpcServer().getFileInfo("/");
    fail("Should have thrown StandbyException, but instead succeeded.");
  } catch (StandbyException se) {
    GenericTestUtils.assertExceptionContains("is not supported", se);
  }

  // Make sure new incremental block reports are processed during
  // checkpointing on the SBN.
  assertEquals(0, cluster.getNamesystem(1).getPendingDataNodeMessageCount());
  doCreate();
  Thread.sleep(1000);
  assertTrue(cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0);
  
  // Make sure that the checkpoint is still going on, implying that the client
  // RPC to the SBN happened during the checkpoint.
  assertTrue("SBN should have still been checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  answerer.proceed();
  answerer.waitForResult();
  assertTrue("SBN should have finished checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
}
 
源代码12 项目: hadoop   文件: TestStandbyCheckpoints.java
@Test(timeout=300000)
public void testReadsAllowedDuringCheckpoint() throws Exception {
  
  // Set it up so that we know when the SBN checkpoint starts and ends.
  FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
  DelayAnswer answerer = new DelayAnswer(LOG);
  Mockito.doAnswer(answerer).when(spyImage1)
      .saveNamespace(Mockito.any(FSNamesystem.class),
          Mockito.any(NameNodeFile.class),
          Mockito.any(Canceler.class));
  
  // Perform some edits and wait for a checkpoint to start on the SBN.
  doEdits(0, 1000);
  nn0.getRpcServer().rollEditLog();
  answerer.waitForCall();
  assertTrue("SBN is not performing checkpoint but it should be.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  
  // Make sure that the lock has actually been taken by the checkpointing
  // thread.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  
  // Perform an RPC that needs to take the write lock.
  Thread t = new Thread() {
    @Override
    public void run() {
      try {
        nn1.getRpcServer().restoreFailedStorage("false");
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  };
  t.start();
  
  // Make sure that our thread is waiting for the lock.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  
  assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
  assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
  assertTrue(nn1.getNamesystem().getCpLockForTests().hasQueuedThreads());
  
  // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
  // be taken.
  String pageContents = DFSTestUtil.urlGet(new URL("http://" +
      nn1.getHttpAddress().getHostName() + ":" +
      nn1.getHttpAddress().getPort() + "/jmx"));
  assertTrue(pageContents.contains("NumLiveDataNodes"));
  
  // Make sure that the checkpoint is still going on, implying that the client
  // RPC to the SBN happened during the checkpoint.
  assertTrue("SBN should have still been checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  answerer.proceed();
  answerer.waitForResult();
  assertTrue("SBN should have finished checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
  
  t.join();
}
 
源代码13 项目: hadoop   文件: TestPendingCorruptDnMessages.java
@Test
public void testChangedStorageId() throws IOException, URISyntaxException,
    InterruptedException {
  HdfsConfiguration conf = new HdfsConfiguration();
  conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
      .numDataNodes(1)
      .nnTopology(MiniDFSNNTopology.simpleHATopology())
      .build();
  
  try {
    cluster.transitionToActive(0);
    
    FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
    OutputStream out = fs.create(filePath);
    out.write("foo bar baz".getBytes());
    out.close();
    
    HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
        cluster.getNameNode(1));
    
    // Change the gen stamp of the block on datanode to go back in time (gen
    // stamps start at 1000)
    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
    assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
    
    // Stop the DN so the replica with the changed gen stamp will be reported
    // when this DN starts up.
    DataNodeProperties dnProps = cluster.stopDataNode(0);
    
    // Restart the namenode so that when the DN comes up it will see an initial
    // block report.
    cluster.restartNameNode(1, false);
    assertTrue(cluster.restartDataNode(dnProps, true));
    
    // Wait until the standby NN queues up the corrupt block in the pending DN
    // message queue.
    while (cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount() < 1) {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
    }
    
    assertEquals(1, cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount());
    String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
    
    // Reformat/restart the DN.
    assertTrue(wipeAndRestartDn(cluster, 0));
    
    // Give the DN time to start up and register, which will cause the
    // DatanodeManager to dissociate the old storage ID from the DN xfer addr.
    String newStorageId = "";
    do {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
      newStorageId = getRegisteredDatanodeUid(cluster, 1);
      System.out.println("====> oldStorageId: " + oldStorageId +
          " newStorageId: " + newStorageId);
    } while (newStorageId.equals(oldStorageId));
    
    assertEquals(0, cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount());
    
    // Now try to fail over.
    cluster.transitionToStandby(0);
    cluster.transitionToActive(1);
  } finally {
    cluster.shutdown();
  }
}
 
源代码14 项目: big-c   文件: TestDFSClientExcludedNodes.java
@Test(timeout=60000)
public void testExcludedNodesForgiveness() throws IOException {
  // Forgive nodes in under 2.5s for this test case.
  conf.setLong(
      DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
      2500);
  // We'll be using a 512 bytes block size just for tests
  // so making sure the checksum bytes too match it.
  conf.setInt("io.bytes.per.checksum", 512);
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
  List<DataNodeProperties> props = cluster.dataNodes;
  FileSystem fs = cluster.getFileSystem();
  Path filePath = new Path("/testForgivingExcludedNodes");

  // 256 bytes data chunk for writes
  byte[] bytes = new byte[256];
  for (int index=0; index<bytes.length; index++) {
    bytes[index] = '0';
  }

  // File with a 512 bytes block size
  FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);

  // Write a block to all 3 DNs (2x256bytes).
  out.write(bytes);
  out.write(bytes);
  out.hflush();

  // Remove two DNs, to put them into the exclude list.
  DataNodeProperties two = cluster.stopDataNode(2);
  DataNodeProperties one = cluster.stopDataNode(1);

  // Write another block.
  // At this point, we have two nodes already in excluded list.
  out.write(bytes);
  out.write(bytes);
  out.hflush();

  // Bring back the older DNs, since they are gonna be forgiven only
  // afterwards of this previous block write.
  Assert.assertEquals(true, cluster.restartDataNode(one, true));
  Assert.assertEquals(true, cluster.restartDataNode(two, true));
  cluster.waitActive();

  // Sleep for 5s, to let the excluded nodes be expired
  // from the excludes list (i.e. forgiven after the configured wait period).
  // [Sleeping just in case the restart of the DNs completed < 5s cause
  // otherwise, we'll end up quickly excluding those again.]
  ThreadUtil.sleepAtLeastIgnoreInterrupts(5000);

  // Terminate the last good DN, to assert that there's no
  // single-DN-available scenario, caused by not forgiving the other
  // two by now.
  cluster.stopDataNode(0);

  try {
    // Attempt writing another block, which should still pass
    // cause the previous two should have been forgiven by now,
    // while the last good DN added to excludes this time.
    out.write(bytes);
    out.hflush();
    out.close();
  } catch (Exception e) {
    fail("Excluded DataNodes should be forgiven after a while and " +
         "not cause file writing exception of: '" + e.getMessage() + "'");
  }
}
 
源代码15 项目: big-c   文件: TestStandbyCheckpoints.java
/**
 * Make sure that clients will receive StandbyExceptions even when a
 * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
 * thread will have FSNS lock. Regression test for HDFS-4591.
 */
@Test(timeout=300000)
public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
  
  // Set it up so that we know when the SBN checkpoint starts and ends.
  FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
  DelayAnswer answerer = new DelayAnswer(LOG);
  Mockito.doAnswer(answerer).when(spyImage1)
      .saveNamespace(Mockito.any(FSNamesystem.class),
          Mockito.eq(NameNodeFile.IMAGE), Mockito.any(Canceler.class));

  // Perform some edits and wait for a checkpoint to start on the SBN.
  doEdits(0, 1000);
  nn0.getRpcServer().rollEditLog();
  answerer.waitForCall();
  assertTrue("SBN is not performing checkpoint but it should be.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  
  // Make sure that the lock has actually been taken by the checkpointing
  // thread.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  try {
    // Perform an RPC to the SBN and make sure it throws a StandbyException.
    nn1.getRpcServer().getFileInfo("/");
    fail("Should have thrown StandbyException, but instead succeeded.");
  } catch (StandbyException se) {
    GenericTestUtils.assertExceptionContains("is not supported", se);
  }

  // Make sure new incremental block reports are processed during
  // checkpointing on the SBN.
  assertEquals(0, cluster.getNamesystem(1).getPendingDataNodeMessageCount());
  doCreate();
  Thread.sleep(1000);
  assertTrue(cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0);
  
  // Make sure that the checkpoint is still going on, implying that the client
  // RPC to the SBN happened during the checkpoint.
  assertTrue("SBN should have still been checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  answerer.proceed();
  answerer.waitForResult();
  assertTrue("SBN should have finished checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
}
 
源代码16 项目: big-c   文件: TestStandbyCheckpoints.java
@Test(timeout=300000)
public void testReadsAllowedDuringCheckpoint() throws Exception {
  
  // Set it up so that we know when the SBN checkpoint starts and ends.
  FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
  DelayAnswer answerer = new DelayAnswer(LOG);
  Mockito.doAnswer(answerer).when(spyImage1)
      .saveNamespace(Mockito.any(FSNamesystem.class),
          Mockito.any(NameNodeFile.class),
          Mockito.any(Canceler.class));
  
  // Perform some edits and wait for a checkpoint to start on the SBN.
  doEdits(0, 1000);
  nn0.getRpcServer().rollEditLog();
  answerer.waitForCall();
  assertTrue("SBN is not performing checkpoint but it should be.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  
  // Make sure that the lock has actually been taken by the checkpointing
  // thread.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  
  // Perform an RPC that needs to take the write lock.
  Thread t = new Thread() {
    @Override
    public void run() {
      try {
        nn1.getRpcServer().restoreFailedStorage("false");
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  };
  t.start();
  
  // Make sure that our thread is waiting for the lock.
  ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
  
  assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
  assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
  assertTrue(nn1.getNamesystem().getCpLockForTests().hasQueuedThreads());
  
  // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
  // be taken.
  String pageContents = DFSTestUtil.urlGet(new URL("http://" +
      nn1.getHttpAddress().getHostName() + ":" +
      nn1.getHttpAddress().getPort() + "/jmx"));
  assertTrue(pageContents.contains("NumLiveDataNodes"));
  
  // Make sure that the checkpoint is still going on, implying that the client
  // RPC to the SBN happened during the checkpoint.
  assertTrue("SBN should have still been checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
  answerer.proceed();
  answerer.waitForResult();
  assertTrue("SBN should have finished checkpointing.",
      answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
  
  t.join();
}
 
源代码17 项目: big-c   文件: TestPendingCorruptDnMessages.java
@Test
public void testChangedStorageId() throws IOException, URISyntaxException,
    InterruptedException {
  HdfsConfiguration conf = new HdfsConfiguration();
  conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
      .numDataNodes(1)
      .nnTopology(MiniDFSNNTopology.simpleHATopology())
      .build();
  
  try {
    cluster.transitionToActive(0);
    
    FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
    OutputStream out = fs.create(filePath);
    out.write("foo bar baz".getBytes());
    out.close();
    
    HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
        cluster.getNameNode(1));
    
    // Change the gen stamp of the block on datanode to go back in time (gen
    // stamps start at 1000)
    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
    assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
    
    // Stop the DN so the replica with the changed gen stamp will be reported
    // when this DN starts up.
    DataNodeProperties dnProps = cluster.stopDataNode(0);
    
    // Restart the namenode so that when the DN comes up it will see an initial
    // block report.
    cluster.restartNameNode(1, false);
    assertTrue(cluster.restartDataNode(dnProps, true));
    
    // Wait until the standby NN queues up the corrupt block in the pending DN
    // message queue.
    while (cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount() < 1) {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
    }
    
    assertEquals(1, cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount());
    String oldStorageId = getRegisteredDatanodeUid(cluster, 1);
    
    // Reformat/restart the DN.
    assertTrue(wipeAndRestartDn(cluster, 0));
    
    // Give the DN time to start up and register, which will cause the
    // DatanodeManager to dissociate the old storage ID from the DN xfer addr.
    String newStorageId = "";
    do {
      ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
      newStorageId = getRegisteredDatanodeUid(cluster, 1);
      System.out.println("====> oldStorageId: " + oldStorageId +
          " newStorageId: " + newStorageId);
    } while (newStorageId.equals(oldStorageId));
    
    assertEquals(0, cluster.getNamesystem(1).getBlockManager()
        .getPendingDataNodeMessageCount());
    
    // Now try to fail over.
    cluster.transitionToStandby(0);
    cluster.transitionToActive(1);
  } finally {
    cluster.shutdown();
  }
}
 
 类所在包
 类方法
 同包方法