下面列出了java.util.concurrent.CompletableFuture#applyToEither ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
final <V> CompletionStage<V> registerAwaitTarget(CompletionStage<V> originalAwait) {
blockerVersion.incrementAndGet();
CompletableFuture<V> terminateMethod = new CompletableFuture<>();
CompletionStage<V> guardedAwait = terminateMethod.applyToEither(originalAwait, Function.identity());
// Save references for outer promise cancellation
this.terminateMethod = terminateMethod;
this.originalAwait = originalAwait;
// Re-check for race with main future cancellation
cancelAwaitIfNecessary(terminateMethod, originalAwait);
return guardedAwait;
}
/**
* When an already-contextualized Function is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualFunctionOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-1"));
Label.set("contextualFunctionOverride-label-1");
Function<Integer, Integer> precontextualizedFunction1 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i + 1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-2"));
Label.set("contextualFunctionOverride-label-2");
Function<Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i + 20;
});
Function<Throwable, Integer> precontextualizedErrorHandler = labelContext.contextualFunction(failure -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return -1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
Label.set("contextualFunctionOverride-label-3");
Function<Integer, Integer> normalFunction = i -> {
Assert.assertEquals(Buffer.get().toString(), "contextualFunctionOverride-buffer-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
return i + 300;
};
CompletableFuture<Integer> stage0 = executor.newIncompleteFuture();
CompletableFuture<Integer> stage1 = stage0.thenApplyAsync(precontextualizedFunction1);
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-4"));
Label.set("contextualFunctionOverride-label-4");
Function<Integer, CompletableFuture<Integer>> precontextualizedFunction4 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-4",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return stage1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
Label.set("contextualFunctionOverride-label-3");
CompletableFuture<Integer> stage2 = stage0.thenComposeAsync(precontextualizedFunction4);
CompletableFuture<Integer> stage3 = stage2.applyToEither(stage1, precontextualizedFunction2);
CompletableFuture<Integer> stage4 = stage3.thenApply(normalFunction);
CompletableFuture<Integer> stage5 = stage4.thenApply(i -> i / (i - 321)) // intentional ArithmeticException for division by 0
.exceptionally(precontextualizedErrorHandler);
stage0.complete(0);
Assert.assertEquals(stage2.join(), Integer.valueOf(1),
"Unexpected result for completion stage.");
Assert.assertEquals(stage5.join(), Integer.valueOf(-1),
"Unexpected result for completion stage.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U> CompletableFuture<U> applyToEither
(CompletableFuture<T> f,
CompletionStage<? extends T> g,
Function<? super T,U> a) {
return f.applyToEither(g, a);
}
private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
return resultFuture.applyToEither(timeoutAfter, Function.identity());
}
public <T,U> CompletableFuture<U> applyToEither
(CompletableFuture<T> f,
CompletionStage<? extends T> g,
Function<? super T,U> a) {
return f.applyToEither(g, a);
}