下面列出了com.google.common.base.Stopwatch#elapsedTime ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void await(long timeout, TimeUnit timeUnit)
throws TransactionFailureException, InterruptedException, TimeoutException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
// Have sleep time to be within 1 microsecond and 500 milliseconds
sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
while (stopwatch.elapsedTime(timeUnit) < timeout) {
txContext.start();
try {
txContext.finish();
return;
} catch (TransactionFailureException e) {
LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
txContext.abort();
TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
}
}
throw new TimeoutException("Timeout waiting for fence");
}
@Test
public void testKilled() throws IOException, InterruptedException, TimeoutException, ExecutionException {
// Create a parent folder to be written by EventHandler
File parentFolder = TMP_FOLDER.newFolder();
parentFolder.setWritable(true, false);
TwillController controller = getTwillRunner().prepare(new SleepApplication(parentFolder.getAbsolutePath()))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
// Wait for the runnable to run and create runFile within 120 secs
File runFile = new File(parentFolder, RUN_FILE);
Stopwatch stopwatch = new Stopwatch().start();
while (!runFile.exists() && stopwatch.elapsedTime(TimeUnit.SECONDS) < 120) {
TimeUnit.SECONDS.sleep(1);
}
Assert.assertTrue(runFile.exists());
// Terminate the app once the runnable runs
controller.terminate();
controller.awaitTerminated(120, TimeUnit.SECONDS);
// EventHandler#killed() method should be called to create a file
Assert.assertTrue(new File(parentFolder.getAbsolutePath(), KILLED_FILE).exists());
// EventHandler#completed() method should not be called
Assert.assertFalse(new File(parentFolder.getAbsolutePath(), COMPLETED_FILE).exists());
// EventHandler#aborted() method should not be called
Assert.assertFalse(new File(parentFolder.getAbsolutePath(), ABORTED_FILE).exists());
}
private boolean waitForLogLevel(TwillController controller, String runnable, long timeout,
TimeUnit timeoutUnit, @Nullable LogEntry.Level expected) throws InterruptedException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
do {
ResourceReport report = controller.getResourceReport();
if (report == null || report.getRunnableResources(runnable) == null) {
continue;
}
for (TwillRunResources resources : report.getRunnableResources(runnable)) {
LogEntry.Level level = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
if (expected == level) {
return true;
}
}
TimeUnit.MILLISECONDS.sleep(100);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
return false;
}
private void waitForContainers(TwillController controller, int count, long timeout, TimeUnit timeoutUnit)
throws Exception {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
int yarnContainers = 0;
int twillContainers = 0;
do {
if (controller.getResourceReport() != null) {
yarnContainers =
getApplicationResourceReport(controller.getResourceReport().getApplicationId()).getNumUsedContainers();
twillContainers = getTwillContainersUsed(controller);
if (yarnContainers == count && twillContainers == count) {
return;
}
}
TimeUnit.SECONDS.sleep(1);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
throw new TimeoutException("Timeout reached while waiting for num containers to be " + count +
". Yarn containers = " + yarnContainers + ", Twill containers = " + twillContainers);
}
private void waitForInstance(TwillController controller, String runnable, String yarnInstanceId,
long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
do {
ResourceReport report = controller.getResourceReport();
if (report != null && report.getRunnableResources(runnable) != null) {
for (TwillRunResources resources : report.getRunnableResources(runnable)) {
if (resources.getContainerId().endsWith(yarnInstanceId)) {
return;
}
}
}
TimeUnit.SECONDS.sleep(1);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
throw new TimeoutException("Timeout reached while waiting for runnable " +
runnable + " instance " + yarnInstanceId);
}
private void killAndWait(int maxWaitSecs) {
Stopwatch watch = new Stopwatch();
watch.start();
while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
// Kill the application
try {
kill();
} catch (Exception e) {
LOG.error("Exception while killing runnable {}, instance {}", runnable, instanceId, e);
}
// Wait on the shutdownLatch,
// if the runnable has stopped then the latch will be count down by completed() method
if (Uninterruptibles.awaitUninterruptibly(shutdownLatch, 10, TimeUnit.SECONDS)) {
// Runnable has stopped now
return;
}
}
// Timeout reached, runnable has not stopped
LOG.error("Failed to kill runnable {}, instance {} after {} seconds", runnable, instanceId,
watch.elapsedTime(TimeUnit.SECONDS));
// TODO: should we throw exception here?
}
@Override
protected void doStartUp() {
super.doStartUp();
// Submit and poll the status of the yarn application
try {
processController = startUp.call();
YarnApplicationReport report = processController.getReport();
ApplicationId appId = report.getApplicationId();
LOG.info("Application {} with id {} submitted", appName, appId);
YarnApplicationState state = report.getYarnApplicationState();
Stopwatch stopWatch = new Stopwatch().start();
LOG.debug("Checking yarn application status for {} {}", appName, appId);
while (!hasRun(state) && stopWatch.elapsedTime(startTimeoutUnit) < startTimeout) {
report = processController.getReport();
state = report.getYarnApplicationState();
LOG.debug("Yarn application status for {} {}: {}", appName, appId, state);
TimeUnit.SECONDS.sleep(1);
}
LOG.info("Yarn application {} {} is in state {}", appName, appId, state);
if (state != YarnApplicationState.RUNNING) {
LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId);
forceShutDown();
}
currentAttemptId = report.getCurrentApplicationAttemptId();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
private boolean expireAppMasterZKSession(TwillController controller, long timeout, TimeUnit timeoutUnit) {
MBeanServer mbeanServer = MBeanRegistry.getInstance().getPlatformMBeanServer();
QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName()));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
do {
// Find the AM session and expire it
Set<ObjectName> connectionBeans = mbeanServer.queryNames(ObjectName.WILDCARD, query);
for (ObjectName objectName : connectionBeans) {
ConnectionMXBean connectionBean = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, objectName,
ConnectionMXBean.class, false);
for (String node : connectionBean.getEphemeralNodes()) {
if (node.endsWith("/instances/" + controller.getRunId().getId())) {
// This is the AM, expire the session.
LOG.info("Kill AM session {}", connectionBean.getSessionId());
connectionBean.terminateSession();
return true;
}
}
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
return false;
}
private void waitForLogLevel(TwillController controller, String runnable, long timeout,
TimeUnit timeoutUnit, LogEntry.Level expected,
Map<String, LogEntry.Level> expectedArgs,
int expectedInstances) throws InterruptedException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (stopwatch.elapsedTime(timeoutUnit) < timeout) {
ResourceReport report = controller.getResourceReport();
if (report == null || report.getRunnableResources(runnable) == null) {
TimeUnit.MILLISECONDS.sleep(100);
continue;
}
int matchCount = 0;
for (TwillRunResources resources : report.getRunnableResources(runnable)) {
LogEntry.Level actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
Map<String, LogEntry.Level> actualArgs = resources.getLogLevels();
if (Objects.equals(expected, actual) && Objects.equals(expectedArgs, actualArgs)) {
matchCount++;
} else {
LOG.info("Log levels not match for {}. {} != {} or {} != {}",
runnable, expected, actual, expectedArgs, actualArgs);
}
}
if (matchCount == expectedInstances) {
return;
}
TimeUnit.SECONDS.sleep(1);
}
Assert.fail("Timeout waiting for expected log levels");
}
/**
* Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()}
* could return null if the application has not fully started.
*
* This method helps validate restart scenario.
*
* To avoid long sleep if instanceIdToContainerId is passed, then compare the container ids to ones before.
* Otherwise just return the valid resource report.
*/
@Nullable
private ResourceReport waitForAfterRestartResourceReport(TwillController controller, String runnable, long timeout,
TimeUnit timeoutUnit, int numOfResources,
@Nullable Map<Integer, String> instanceIdToContainerId) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
do {
ResourceReport report = controller.getResourceReport();
if (report == null || report.getRunnableResources(runnable) == null) {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} else if (report.getRunnableResources(runnable) == null ||
report.getRunnableResources(runnable).size() != numOfResources) {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} else {
if (instanceIdToContainerId == null) {
LOG.info("Return resource report without comparing container ids.");
return report;
}
Collection<TwillRunResources> runResources = report.getRunnableResources(runnable);
boolean isSameContainer = false;
for (TwillRunResources twillRunResources : runResources) {
int instanceId = twillRunResources.getInstanceId();
if (twillRunResources.getContainerId().equals(instanceIdToContainerId.get(instanceId))) {
// found same container id lets wait again.
LOG.warn("Found an instance id {} with same container id {} for restart all, let's wait for a while.",
instanceId, twillRunResources.getContainerId());
isSameContainer = true;
break;
}
}
if (!isSameContainer) {
LOG.info("Get set of different container ids for restart.");
return report;
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
} while (stopwatch.elapsedTime(timeoutUnit) < timeout);
LOG.error("Unable to get different container ids for restart.");
return null;
}
/**
* Waits for a task returns the expected value.
*
* @param expected the expected value
* @param callable the task to execute
* @param timeout timeout of the wait
* @param delay delay between calls to the task to poll for the latest value
* @param unit unit for the timeout and delay
* @param <T> type of the expected value
* @throws Exception if the task through exception or timeout.
*/
public <T> void waitFor(T expected, Callable<T> callable, long timeout, long delay, TimeUnit unit) throws Exception {
Stopwatch stopwatch = new Stopwatch().start();
while (callable.call() != expected && stopwatch.elapsedTime(unit) < timeout) {
unit.sleep(delay);
}
}