下面列出了java.util.concurrent.CompletableFuture#thenAcceptBoth ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* When an already-contextualized Supplier or BiFunction is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualSuppplierAndBiConsumerOverrideContextOfManagedExecutor()
throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext bufferContext = ThreadContext.builder()
.propagated(Buffer.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Label.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Supplier<String> getBuffer = () -> {
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
return Buffer.get().toString();
};
Buffer.set(new StringBuffer("contextualSupplierOverride-buffer-1"));
Label.set("contextualSupplierOverride-label-1");
Supplier<String> precontextualizedSupplier1 = bufferContext.contextualSupplier(getBuffer);
Buffer.set(new StringBuffer("contextualSupplierOverride-buffer-2"));
Label.set("contextualSupplierOverride-label-2");
Supplier<String> precontextualizedSupplier2 = bufferContext.contextualSupplier(getBuffer);
Buffer.set(new StringBuffer("contextualBiConsumerOverride-buffer-3"));
Label.set("contextualBiConsumerOverride-label-3");
BiConsumer<String, String> precontextualizedConsumer3 = bufferContext.contextualConsumer((b1, b2) -> {
Assert.assertEquals(Buffer.get().toString(), "contextualBiConsumerOverride-buffer-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
Assert.assertEquals(b1, "contextualSupplierOverride-buffer-1",
"Previously captured context type not found on Supplier's thread.");
Assert.assertEquals(b2, "contextualSupplierOverride-buffer-2",
"Previously captured context type not found on Supplier's thread.");
});
Buffer.set(new StringBuffer("contextualBiConsumerOverride-buffer-4"));
Label.set("contextualBiConsumerOverride-label-4");
BiConsumer<Void, Throwable> precontextualizedConsumer4 = bufferContext.contextualConsumer((unused, failure) -> {
Assert.assertEquals(Buffer.get().toString(), "contextualBiConsumerOverride-buffer-4",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualSupplierAndBiConsumerOverride-buffer-5"));
Label.set("contextualSupplierAndBiConsumerOverride-label-5");
CompletableFuture<String> stage1 = executor.supplyAsync(precontextualizedSupplier1);
CompletableFuture<String> stage2 = executor.supplyAsync(precontextualizedSupplier2);
CompletableFuture<Void> stage3 = stage1.thenAcceptBoth(stage2, precontextualizedConsumer3);
CompletableFuture<Void> stage4 = stage3.whenCompleteAsync(precontextualizedConsumer4);
CompletableFuture<Void> stage5 = stage4.whenComplete((unused, failure) -> {
Assert.assertEquals(Label.get(), "contextualSupplierAndBiConsumerOverride-label-5",
"Context type captured by managed executor not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
stage5.join();
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBoth(g, a);
}
public static void main(String[] args) throws Exception {
DefaultCacheManager cacheManager = new DefaultCacheManager();
cacheManager.defineConfiguration("local", new ConfigurationBuilder().build());
AdvancedCache<String, String> cache = cacheManager.<String, String>getCache("local").getAdvancedCache();
FunctionalMapImpl<String, String> functionalMap = FunctionalMapImpl.create(cache);
FunctionalMap.WriteOnlyMap<String, String> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
FunctionalMap.ReadOnlyMap<String, String> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
// Execute two parallel write-only operation to store key/value pairs
CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", "value1",
(v, writeView) -> writeView.set(v));
CompletableFuture<Void> writeFuture2 = writeOnlyMap.eval("key2", "value2",
(v, writeView) -> writeView.set(v));
// When each write-only operation completes, execute a read-only operation to retrieve the value
CompletableFuture<String> readFuture1 =
writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::get));
CompletableFuture<String> readFuture2 =
writeFuture2.thenCompose(r -> readOnlyMap.eval("key2", EntryView.ReadEntryView::get));
// When the read-only operation completes, print it out
System.out.printf("Created entries: %n");
CompletableFuture<Void> end = readFuture1.thenAcceptBoth(readFuture2, (v1, v2) ->
System.out.printf("key1 = %s%nkey2 = %s%n", v1, v2));
// Wait for this read/write combination to finish
end.get();
// Create a read-write map
FunctionalMap.ReadWriteMap<String, String> readWriteMap = ReadWriteMapImpl.create(functionalMap);
// Use read-write multi-key based operation to write new values
// together with lifespan and return previous values
Map<String, String> data = new HashMap<>();
data.put("key1", "newValue1");
data.put("key2", "newValue2");
Traversable<String> previousValues = readWriteMap.evalMany(data, (v, readWriteView) -> {
String prev = readWriteView.find().orElse(null);
readWriteView.set(v, new MetaLifespan(Duration.ofHours(1).toMillis()));
return prev;
});
// Use read-only multi-key operation to read current values for multiple keys
Traversable<EntryView.ReadEntryView<String, String>> entryViews =
readOnlyMap.evalMany(data.keySet(), readOnlyView -> readOnlyView);
System.out.printf("Updated entries: %n");
entryViews.forEach(view -> System.out.printf("%s%n", view));
// Finally, print out the previous entry values
System.out.printf("Previous entry values: %n");
previousValues.forEach(prev -> System.out.printf("%s%n", prev));
}
public <T,U> CompletableFuture<Void> thenAcceptBoth
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiConsumer<? super T,? super U> a) {
return f.thenAcceptBoth(g, a);
}