下面列出了java.util.concurrent.CompletableFuture#thenAcceptBothAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
void scheduleTask(List<Task> tasks, int threads) {
ExecutorService executor = Executors.newFixedThreadPool(threads);
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (Task task : tasks) {
future = future.thenAcceptBothAsync(scheduleTaskUtil(task, executor), (a, b) -> {}, executor);
}
future.thenRunAsync(() -> {System.out.println("All tasks done. Closing executor"); executor.shutdown();});
}
/**
* Verify that the ThreadContext's contextualConsumer
* method can be used to wrap a BiConsumer instance with the context that is captured from the
* current thread per the configuration of the ThreadContext builder, and that the context is
* applied when the BiConsumer accept method runs. This test case aligns with use case of
* supplying a contextual BiConsumer to a completion stage that is otherwise not context-aware.
*
* @throws InterruptedException indicates test failure
*/
@Test
public void contextualBiConsumerRunsWithContext() throws InterruptedException {
ThreadContext bufferContext = ThreadContext.builder()
.propagated(Buffer.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
// Set non-default values
Buffer.get().append("contextualBiConsumer-test-buffer");
Label.set("contextualBiConsumer-test-label");
// To avoid the possibility that CompletableFuture.get might cause the action to run
// on the current thread, which would bypass the intent of testing context propagation,
// use a countdown latch to independently wait for completion.
CountDownLatch completed = new CountDownLatch(1);
// CompletableFuture from Java SE is intentionally used here to avoid the context
// propagation guarantees of ManagedExecutor.
// This ensures we are testing that ThreadContext is
// doing the work to propagate the context rather than getting it from a
// ManagedExecutor.
CompletableFuture<String> stage1a = CompletableFuture.supplyAsync(() -> "supplied-value-A");
CompletableFuture<String> stage1b = CompletableFuture.supplyAsync(() -> "supplied-value-B");
CompletableFuture<Void> stage2 = stage1a.thenAcceptBothAsync(stage1b,
bufferContext.contextualConsumer((a, b) -> {
Assert.assertEquals(a, "supplied-value-A",
"First value supplied to BiConsumer was lost or altered.");
Assert.assertEquals(b, "supplied-value-B",
"Second value supplied to BiConsumer was lost or altered.");
Assert.assertEquals(Buffer.get().toString(), "contextualBiConsumer-test-buffer",
"Context type was not propagated to contextual action.");
Assert.assertEquals(Label.get(), "",
"Context type that is configured to be cleared was not cleared.");
}),
unmanagedThreads);
stage2.whenComplete((unused, failure) -> completed.countDown());
Assert.assertTrue(completed.await(MAX_WAIT_NS, TimeUnit.NANOSECONDS),
"Completable future did not finish in a reasonable amount of time.");
// Force errors, if any, to be reported
stage2.join();
}
finally {
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBothAsync(g, a);
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBothAsync(g, a, new ThreadExecutor());
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBothAsync(g, a);
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBothAsync(g, a, new ThreadExecutor());
}