下面列出了com.google.common.util.concurrent.ListeningExecutorService#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(groups = {"WIP", "Integration"})
public void testExecScriptBigConcurrentCommand() throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
List<ListenableFuture<?>> futures = new ArrayList<ListenableFuture<?>>();
try {
for (int i = 0; i < 10; i++) {
final ShellTool localtool = newTool();
connect(localtool);
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
String bigstring = Strings.repeat("abcdefghij", 1000); // 10KB
String out = execScript(localtool, ImmutableList.of("export MYPROP="+bigstring, "echo val is $MYPROP"));
assertTrue(out.contains("val is "+bigstring), "outSize="+out.length()+"; out="+out);
}}));
}
Futures.allAsList(futures).get();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testGetTaskShouldDie() throws InterruptedException, ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
@SuppressWarnings("deprecation")
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
ContainerContext containerContext = new ContainerContext(containerId.toString());
ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100);
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
getTaskFuture.get();
assertEquals(1, umbilical.getTaskInvocations);
} finally {
executor.shutdownNow();
}
}
private void onFinish(SingularityExecutorTask task, Protos.TaskState taskState) {
processKiller.cancelDestroyFuture(task.getTaskId());
tasks.remove(task.getTaskId());
processRunningTasks.remove(task.getTaskId());
processBuildingTasks.remove(task.getTaskId());
task.cleanup(taskState);
ListeningExecutorService executorService = taskToShellCommandPool.remove(
task.getTaskId()
);
if (executorService != null) {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Awaiting shutdown of shell executor service", e);
}
}
logging.stopTaskLogger(task.getTaskId(), task.getLogbackLog());
checkIdleExecutorShutdown(task.getDriver());
}
@Test(timeout = 5000)
public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalSendShouldDie();
umbilical.awaitRegisteredEvent();
// Not signaling an actual start to verify task interruption
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true, null);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// TODO Is this statement correct ?
// No completion events since shouldDie was requested by the AM, which should have killed the
// task.
umbilical.verifyNoCompletionEvents();
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(groups = {"WIP", "Integration"})
public void testExecScriptBigConcurrentSleepyCommand() throws Exception {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
List<ListenableFuture<?>> futures = new ArrayList<ListenableFuture<?>>();
try {
long starttime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
final ShellTool localtool = newTool();
connect(localtool);
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
String bigstring = Strings.repeat("abcdefghij", 1000); // 10KB
String out = execScript(localtool, ImmutableList.of("sleep 2", "export MYPROP="+bigstring, "echo val is $MYPROP"));
assertTrue(out.contains("val is "+bigstring), "out="+out);
}}));
}
Futures.allAsList(futures).get();
long runtime = System.currentTimeMillis() - starttime;
long OVERHEAD = 20*1000;
assertTrue(runtime < 2000+OVERHEAD, "runtime="+runtime);
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testTaskSelfKill() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SELF_KILL_AND_COMPLETE);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_KILL_REQUEST, createProcessorIOException(), false,
null);
TestProcessor.awaitCompletion();
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskKilledEvent(
KILL_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName());
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testTaskKilled() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
taskRunner.killTask();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false, null);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// Kill events are not sent over the umbilical at the moment.
umbilical.verifyNoCompletionEvents();
} finally {
executor.shutdownNow();
}
}
@Test
public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
boolean result = taskRunnerFuture.get();
assertTrue(result);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
umbilical.resetTrackedEvents();
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
result = taskRunnerFuture.get();
assertTrue(result);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testFailedTask2() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
"NotExitedProcessor", TestProcessor.CONF_EMPTY, false, true);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
new TezReflectionException("TezReflectionException"), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(FAILURE_START_STRING,
":org.apache.tez.dag.api.TezReflectionException: "
+ "Unable to load class: NotExitedProcessor");
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testHeartbeatException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalThrowException();
umbilical.awaitRegisteredEvent();
// Not signaling an actual start to verify task interruption
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE,
new IOException("IOException"),
TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false, TaskFailureType.NON_FATAL);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// No completion events since umbilical communication already failed.
umbilical.verifyNoCompletionEvents();
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test
public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalSendShouldDie();
umbilical.awaitRegisteredEvent();
// Not signaling an actual start to verify task interruption
boolean result = taskRunnerFuture.get();
assertFalse(result);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// TODO Is this statement correct ?
// No completion events since shouldDie was requested by the AM, which should have killed the
// task.
umbilical.verifyNoCompletionEvents();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testKilledAfterComplete() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2ForTest taskRunner =
createTaskRunnerForTest(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TestProcessor.awaitCompletion();
taskRunner.awaitCallableCompletion();
taskRunner.killTask();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertFalse(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testFailedTaskIOException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_IO_EXCEPTION);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName());
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testConcurrentRequests() throws InterruptedException {
int timeoutSecond = 5;
int concurThread = 10;
int exceptionCount = 0;
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
List<ListenableFuture<Object>> pendingTasks = new ArrayList<ListenableFuture<Object>>();
final ExecutorService callbackExecutor = Executors.newFixedThreadPool(concurThread,
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("CallbackExecutor").build());
ListeningExecutorService taskExecutorService =
MoreExecutors.listeningDecorator(callbackExecutor);
while(concurThread > 0){
ListenableFuture<Object> runningTaskFuture =
taskExecutorService.submit(new EnvironmentRequest());
pendingTasks.add(runningTaskFuture);
concurThread--;
}
//waiting for all threads submitted to thread pool
for (ListenableFuture<Object> future : pendingTasks) {
try {
future.get();
} catch (ExecutionException e) {
exceptionCount++;
}
}
//stop accepting new threads and shutdown threadpool
taskExecutorService.shutdown();
try {
if(!taskExecutorService.awaitTermination(timeoutSecond, TimeUnit.SECONDS)) {
taskExecutorService.shutdownNow();
}
} catch (InterruptedException ie) {
taskExecutorService.shutdownNow();
}
assertEquals(0, exceptionCount);
}
@Test(timeout = 5000)
public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_TEZ_EXCEPTION);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
TezException.class.getName() + ": " + TezException.class.getSimpleName());
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
/**
* Confirm concurrent {@link KeyStoreManagerImpl#importTrustCertificate(Certificate, String)} invocations
* occur safely.
*/
@Test
public void testConcurrentImportTrustCertificate() throws Exception {
X509Certificate certificate1 = generateCertificate(10,
"concurrency-1", "ou", "o", "l", "st", "country");
X509Certificate certificate2 = generateCertificate(10,
"concurrency-2", "ou", "o", "l", "st", "country");
KeyStoreManagerConfiguration configuration = createMockConfiguration();
KeystoreInstance trustStore = mock(KeystoreInstance.class);
CountDownLatch block = new CountDownLatch(1);
// any calls to trustStore#importTrustCertificate should block on the latch
doAnswer(blockingAnswer(block))
.when(trustStore)
.importTrustCertificate(
any(Certificate.class), any(String.class),
any(char[].class)
);
KeyStoreManagerImpl manager = new KeyStoreManagerImpl(crypto, configuration, null, trustStore);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
List<ListenableFuture<String>> futures = new ArrayList<>();
try {
futures.add(service.submit(() -> {
manager.importTrustCertificate(certificate1, "concurrency-1");
return "concurrency-1";
}));
futures.add(service.submit(() -> {
manager.importTrustCertificate(certificate2, "concurrency-2");
return "concurrency-2";
}));
// no matter how long we wait, this list should be empty if we've guarded correctly
List<String> results = Futures.successfulAsList(futures).get(100, TimeUnit.MILLISECONDS);
assertEquals(0, results.size());
} catch (TimeoutException e) {
// expected; from Futures.successfulAsList().get()
} finally {
// release the latch so those threads are unblocked
block.countDown();
service.shutdownNow();
}
// a passing test will show that we only called KeyStoreInstance#importTrustCertificate once and only once
// if we see more than one invocation, we passed the concurrency guard, which is unsafe
// since KeystoreInstance is not thread-safe
verify(trustStore, times(1))
.importTrustCertificate(
any(Certificate.class), any(String.class),
any(char[].class));
}
@Test(timeout = 5000)
public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, true);
LogicalIOProcessorRuntimeTask runtimeTask = taskRunner.task;
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
umbilical.resetTrackedEvents();
TezCounters tezCounters = runtimeTask.getCounters();
verifySysCounters(tezCounters, 5, 5);
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, false);
runtimeTask = taskRunner.task;
// Setup the executor
taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
tezCounters = runtimeTask.getCounters();
verifySysCounters(tezCounters, -1, -1);
} finally {
executor.shutdownNow();
}
}
@Override
protected void evalTopLevelInternal(
QueryExpression expr, OutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException {
Throwable throwableToThrow = null;
try {
super.evalTopLevelInternal(expr, callback);
} catch (Throwable throwable) {
throwableToThrow = throwable;
} finally {
if (throwableToThrow != null) {
logger.atInfo().withCause(throwableToThrow).log(
"About to shutdown query threadpool because of throwable");
ListeningExecutorService obsoleteExecutor = executor;
// Signal that executor must be recreated on the next invocation.
executor = null;
// If evaluation failed abruptly (e.g. was interrupted), attempt to terminate all remaining
// tasks and then wait for them all to finish. We don't want to leave any dangling threads
// running tasks.
obsoleteExecutor.shutdownNow();
boolean interrupted = false;
boolean executorTerminated = false;
try {
while (!executorTerminated) {
try {
executorTerminated =
obsoleteExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
handleInterruptedShutdown();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
Throwables.propagateIfPossible(
throwableToThrow, QueryException.class, InterruptedException.class);
}
}
}
private SingularityS3SearchResult getS3Logs(
S3Configuration s3Configuration,
Map<SingularityS3Service, Set<String>> servicesToPrefixes,
final SingularityS3SearchRequest search,
final boolean paginated
)
throws InterruptedException, ExecutionException, TimeoutException {
int totalPrefixCount = 0;
for (Map.Entry<SingularityS3Service, Set<String>> entry : servicesToPrefixes.entrySet()) {
totalPrefixCount += entry.getValue().size();
}
if (totalPrefixCount == 0) {
return SingularityS3SearchResult.empty();
}
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.min(totalPrefixCount, s3Configuration.getMaxS3Threads()),
new ThreadFactoryBuilder().setNameFormat("S3LogFetcher-%d").build()
)
);
try {
final ConcurrentHashMap<String, ContinuationToken> continuationTokens = new ConcurrentHashMap<>();
List<SingularityS3LogMetadata> logs = Lists.newArrayList(
getS3LogsWithExecutorService(
s3Configuration,
executorService,
servicesToPrefixes,
totalPrefixCount,
search,
continuationTokens,
paginated
)
);
Collections.sort(logs, LOG_COMPARATOR);
return new SingularityS3SearchResult(
continuationTokens,
isFinalPageForAllPrefixes(continuationTokens.values()),
logs
);
} finally {
executorService.shutdownNow();
}
}
@Test
public void testDelegateJobsLimited() throws Exception {
int maxJobs = 1;
ListeningExecutorService service =
MoreExecutors.listeningDecorator(MostExecutors.newMultiThreadExecutor("test", 4));
try {
JobLimitingBuildRuleStrategy delegate = new JobLimitingBuildRuleStrategy(maxJobs, service);
try (HybridLocalStrategy strategy =
new HybridLocalStrategy(
0,
0,
1,
delegate,
new NoOpWorkerRequirementsProvider(),
Optional.empty(),
NO_AUXILIARY_BUILD_TAG,
BuckEventBusForTests.newInstance())) {
List<ListenableFuture<Optional<BuildResult>>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
FakeBuildRule rule = new FakeBuildRule("//:target-" + i);
results.add(
Futures.submitAsync(
() ->
strategy
.build(rule, new SimpleBuildStrategyContext(rule, service))
.getBuildResult(),
service));
}
delegate.waiting.release(3);
assertTrue(delegate.finished.tryAcquire(3, 1, TimeUnit.SECONDS));
assertFalse(delegate.finished.tryAcquire(20, TimeUnit.MILLISECONDS));
delegate.waiting.release(7);
assertTrue(delegate.finished.tryAcquire(7, 1, TimeUnit.SECONDS));
Futures.allAsList(results).get(1, TimeUnit.SECONDS);
for (ListenableFuture<Optional<BuildResult>> r : results) {
assertTrue(r.isDone());
assertTrue(r.get().get().isSuccess());
}
}
} finally {
service.shutdownNow();
}
}