下面列出了com.google.common.util.concurrent.ListeningExecutorService#awaitTermination ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void writesUnblockReads() throws ExecutionException, InterruptedException {
ListeningExecutorService service = listeningDecorator(newSingleThreadExecutor());
AtomicInteger counter = new AtomicInteger();
RingBufferInputStream buffer = new RingBufferInputStream(1);
ListenableFuture<Integer> readFuture =
service.submit(
() -> {
counter.getAndIncrement();
return buffer.read();
});
byte[] content = new byte[1];
content[0] = 42;
while (counter.get() != 1) {
MICROSECONDS.sleep(10);
}
assertThat(readFuture.isDone()).isFalse();
buffer.write(content);
assertThat(readFuture.get()).isEqualTo(content[0]);
service.shutdown();
service.awaitTermination(10, MICROSECONDS);
}
@Test
public void readUnblocksWrite() throws ExecutionException, IOException, InterruptedException {
ListeningExecutorService service = listeningDecorator(newSingleThreadExecutor());
AtomicInteger counter = new AtomicInteger();
RingBufferInputStream buffer = new RingBufferInputStream(1);
byte[] content = new byte[1];
content[0] = 42;
buffer.write(content); // buffer is now full
ListenableFuture<Void> writeFuture =
service.submit(
() -> {
counter.getAndIncrement();
buffer.write(content);
return null;
});
while (counter.get() != 1) {
MICROSECONDS.sleep(10);
}
assertThat(writeFuture.isDone()).isFalse();
buffer.read();
assertThat(writeFuture.get()).isEqualTo(null);
service.shutdown();
service.awaitTermination(10, MICROSECONDS);
}
/**
* Counts yield and q30 of fastqs in the fastqsPerSample multimap, using 1 thread per file.
* The yield and q30 of the Undetermined sample will count towards the total yield and q30 of the flowcell.
*
* @param fastqsPerSample multimap of sampleName and fastqs to process
* @param threadCount number of maximum threads
* @return FastqTracker with yield and q30 stats for the fastqs processed.
*/
@NotNull
static FastqTracker processFastqs(@NotNull final Multimap<String, File> fastqsPerSample, final int threadCount)
throws InterruptedException {
LOGGER.info("Using {} threads. Processing {} fastQ files.", threadCount, fastqsPerSample.size());
final FastqTrackerWrapper tracker = new FastqTrackerWrapper();
final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
for (final String sampleName : fastqsPerSample.keySet()) {
final Collection<File> fastqs = fastqsPerSample.get(sampleName);
for (final File fastq : fastqs) {
final String laneName = getLaneName(fastq);
final ListenableFuture<FastqData> futureResult = threadPool.submit(() -> processFile(fastq));
addCallback(futureResult, (data) -> tracker.addDataFromSampleFile(sampleName, laneName, data),
(error) -> LOGGER.error("Failed to process file: {}", fastq.getName(), error));
}
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
return tracker.tracker();
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() throws InterruptedException {
MILLISECONDS.sleep(100);
return null;
}
});
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(200);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() {
return null;
}
});
MILLISECONDS.sleep(100);
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(100);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() throws InterruptedException {
MILLISECONDS.sleep(100);
return null;
}
});
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(200);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() {
return null;
}
});
MILLISECONDS.sleep(100);
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(100);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
private void onFinish(SingularityExecutorTask task, Protos.TaskState taskState) {
processKiller.cancelDestroyFuture(task.getTaskId());
tasks.remove(task.getTaskId());
processRunningTasks.remove(task.getTaskId());
processBuildingTasks.remove(task.getTaskId());
task.cleanup(taskState);
ListeningExecutorService executorService = taskToShellCommandPool.remove(
task.getTaskId()
);
if (executorService != null) {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Awaiting shutdown of shell executor service", e);
}
}
logging.stopTaskLogger(task.getTaskId(), task.getLogbackLog());
checkIdleExecutorShutdown(task.getDriver());
}
@Test
public void testConcurrentFetchTasks() throws Exception {
// Test for regression of AURORA-1625
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
assertStoreContents();
saveTasks(TASK_A, TASK_B, TASK_C, TASK_D);
List<ListenableFuture<Integer>> futures = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
futures.add(executor.submit(() -> Iterables.size(fetchTasks(Query.unscoped()))));
}
Future<List<Integer>> f = Futures.allAsList(futures);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
assertEquals(Iterables.getOnlyElement(ImmutableSet.copyOf(f.get())), (Integer) 4);
}
@Test(timeout = 5000)
public void testConcurrentRequests() throws InterruptedException {
int timeoutSecond = 5;
int concurThread = 10;
int exceptionCount = 0;
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
List<ListenableFuture<Object>> pendingTasks = new ArrayList<ListenableFuture<Object>>();
final ExecutorService callbackExecutor = Executors.newFixedThreadPool(concurThread,
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("CallbackExecutor").build());
ListeningExecutorService taskExecutorService =
MoreExecutors.listeningDecorator(callbackExecutor);
while(concurThread > 0){
ListenableFuture<Object> runningTaskFuture =
taskExecutorService.submit(new EnvironmentRequest());
pendingTasks.add(runningTaskFuture);
concurThread--;
}
//waiting for all threads submitted to thread pool
for (ListenableFuture<Object> future : pendingTasks) {
try {
future.get();
} catch (ExecutionException e) {
exceptionCount++;
}
}
//stop accepting new threads and shutdown threadpool
taskExecutorService.shutdown();
try {
if(!taskExecutorService.awaitTermination(timeoutSecond, TimeUnit.SECONDS)) {
taskExecutorService.shutdownNow();
}
} catch (InterruptedException ie) {
taskExecutorService.shutdownNow();
}
assertEquals(0, exceptionCount);
}
private static void run() throws IOException, InterruptedException, ExecutionException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
// create dummy log/index files, and load the reader from them
final File logFile = new File("reloadabletest.spl");
create(Sparkey.getIndexFile(logFile));
final ReloadableSparkeyReader reader = ReloadableSparkeyReader.fromLogFile(logFile, executorService).toCompletableFuture().get();
// should be ignored (same file)
reader.load(logFile);
// should load from second file now
final File logFile2 = new File("reloadabletest2.spl");
create(Sparkey.getIndexFile(logFile2));
reader.load(logFile2);
reader.close();
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
Sparkey.getIndexFile(logFile).delete();
logFile.delete();
Sparkey.getIndexFile(logFile2).delete();
logFile2.delete();
System.out.println("Done!");
}
@Override
protected void evalTopLevelInternal(
QueryExpression expr, OutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException {
Throwable throwableToThrow = null;
try {
super.evalTopLevelInternal(expr, callback);
} catch (Throwable throwable) {
throwableToThrow = throwable;
} finally {
if (throwableToThrow != null) {
logger.atInfo().withCause(throwableToThrow).log(
"About to shutdown query threadpool because of throwable");
ListeningExecutorService obsoleteExecutor = executor;
// Signal that executor must be recreated on the next invocation.
executor = null;
// If evaluation failed abruptly (e.g. was interrupted), attempt to terminate all remaining
// tasks and then wait for them all to finish. We don't want to leave any dangling threads
// running tasks.
obsoleteExecutor.shutdownNow();
boolean interrupted = false;
boolean executorTerminated = false;
try {
while (!executorTerminated) {
try {
executorTerminated =
obsoleteExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
handleInterruptedShutdown();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
Throwables.propagateIfPossible(
throwableToThrow, QueryException.class, InterruptedException.class);
}
}
}