下面列出了java.util.concurrent.CompletableFuture#thenApplyAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
if (jobManagerRunnerFuture == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
return leaderGatewayFuture.thenApplyAsync(
(JobMasterGateway jobMasterGateway) -> {
// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
if (jobManagerRunnerFutures.containsKey(jobId)) {
return jobMasterGateway;
} else {
throw new CompletionException(new FlinkJobNotFoundException(jobId));
}
},
getMainThreadExecutor());
}
}
/***************************************
* Runs this execution step asynchronously as a continuation of a previous
* code execution in a {@link CompletableFuture} and proceeds to the next
* step afterwards.
*
* <p>Subclasses that need to suspend the invocation of the next step until
* some condition is met (e.g. sending or receiving data has finished) need
* to override this method and create a {@link Suspension} by invoking
* {@link Continuation#suspend(CoroutineStep, CoroutineStep)} on the next
* step. If the condition that caused the suspension resolves the coroutine
* execution can be resumed by calling {@link
* Suspension#resume(Object)}.</p>
*
* <p>Subclasses that override this method also need to handle errors by
* terminating any further execution (i.e. not resuming a suspension if such
* exists) and forwarding the causing exception to {@link
* Continuation#fail(Throwable)}.</p>
*
* @param fPreviousExecution The future of the previous code execution
* @param rNextStep The next step to execute or NULL for none
* @param rContinuation The continuation of the execution
*/
public void runAsync(CompletableFuture<I> fPreviousExecution,
CoroutineStep<O, ?> rNextStep,
Continuation<?> rContinuation)
{
CompletableFuture<O> fExecution =
fPreviousExecution.thenApplyAsync(
i -> execute(i, rContinuation),
rContinuation);
if (rNextStep != null)
{
// the next step is either a StepChain which contains it's own
// next step or the final step in a coroutine and therefore the
// rNextStep argument can be NULL
rNextStep.runAsync(fExecution, null, rContinuation);
}
else
{
// only add exception handler to the end of a chain, i.e. next == null
fExecution.exceptionally(e -> fail(e, rContinuation));
}
}
@Transactional
@GET
@Path("/transaction")
public CompletionStage<String> transactionTest() throws SystemException {
CompletableFuture<String> ret = all.completedFuture("OK");
ContextEntity entity = new ContextEntity();
entity.name = "Stef";
entity.persist();
Transaction t1 = Panache.getTransactionManager().getTransaction();
Assertions.assertNotNull(t1);
return ret.thenApplyAsync(text -> {
Assertions.assertEquals(1, ContextEntity.count());
Transaction t2;
try {
t2 = Panache.getTransactionManager().getTransaction();
} catch (SystemException e) {
throw new RuntimeException(e);
}
Assertions.assertEquals(t1, t2);
return text;
});
}
/**
* React to the completion of any of the events in the previous stage. Will not work reliably with Streams
* where filter has been applied in earlier stages. (As Filter completes the Stream for events that are filtered out, they
* potentially shortcircuit the completion of the stage).
*
* @param fn Function to applyHKT when any of the previous events complete
* @return Next stage in the stream
*/
default <R> SimpleReactStream<R> anyOf(final Function<? super U, ? extends R> fn) {
final CompletableFuture[] array = lastActiveArray(getLastActive());
final CompletableFuture cf = CompletableFuture.anyOf(array);
final CompletableFuture onSuccess = cf.thenApplyAsync(fn, getTaskExecutor());
return (SimpleReactStream<R>) withLastActive(new EagerStreamWrapper(
onSuccess, getErrorHandler()));
}
@Benchmark
public String mapPromise() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<String>();
CompletableFuture<String> f = p.thenApplyAsync(mapF);
p.complete(string);
return f.get();
}
@Benchmark
public String mapPromiseN() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<String>();
CompletableFuture<String> f = p;
for (int i = 0; i < N.n; i++)
f = f.thenApplyAsync(mapF);
p.complete(string);
return f.get();
}
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");
completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
CompletableFuture<Void> procedureFuture = completableFuture.thenAcceptAsync(System.out::println);
assertThat(procedureFuture.get()).isNull();
}
@Benchmark
public String setValueN() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<>();
CompletableFuture<String> f = p;
for (int i = 0; i < N.n; i++)
f = f.thenApplyAsync(mapF);
p.complete(string);
return f.get();
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
if(executor!=null && !cf.isDone()) {
cf = cf.thenApplyAsync( r -> r, executor);
}
return cf;
}
@Benchmark
public String mapPromise() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<String>();
CompletableFuture<String> f = p.thenApplyAsync(mapF);
p.complete(string);
return f.get();
}
@Test
public void testCustomContextPropagation() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
// set something to custom context
CustomContext.set("foo");
CompletableFuture<String> ret = tc.withContextCapture(CompletableFuture.completedFuture("void"));
CompletableFuture<Void> cfs = ret.thenApplyAsync(text -> {
Assertions.assertEquals("foo", CustomContext.get());
return null;
}, executor);
cfs.get();
}
@Benchmark
public String setValueN() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<>();
CompletableFuture<String> f = p;
for (int i = 0; i < N.n; i++)
f = f.thenApplyAsync(mapF);
p.complete(string);
return f.get();
}
@GET
@Path("/servlet-tc")
public CompletionStage<String> servletThreadContextTest(@Context UriInfo uriInfo) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> ret = allTc.withContextCapture(CompletableFuture.completedFuture("OK"));
return ret.thenApplyAsync(text -> {
servletRequest.getContentType();
return text;
}, executor);
}
public <T,U> CompletableFuture<U> thenApply
(CompletableFuture<T> f, Function<? super T,U> a) {
return f.thenApplyAsync(a);
}
/**
* When an already-contextualized Function is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualFunctionOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-1"));
Label.set("contextualFunctionOverride-label-1");
Function<Integer, Integer> precontextualizedFunction1 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i + 1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-2"));
Label.set("contextualFunctionOverride-label-2");
Function<Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i + 20;
});
Function<Throwable, Integer> precontextualizedErrorHandler = labelContext.contextualFunction(failure -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return -1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
Label.set("contextualFunctionOverride-label-3");
Function<Integer, Integer> normalFunction = i -> {
Assert.assertEquals(Buffer.get().toString(), "contextualFunctionOverride-buffer-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
return i + 300;
};
CompletableFuture<Integer> stage0 = executor.newIncompleteFuture();
CompletableFuture<Integer> stage1 = stage0.thenApplyAsync(precontextualizedFunction1);
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-4"));
Label.set("contextualFunctionOverride-label-4");
Function<Integer, CompletableFuture<Integer>> precontextualizedFunction4 = labelContext.contextualFunction(i -> {
Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-4",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return stage1;
});
Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
Label.set("contextualFunctionOverride-label-3");
CompletableFuture<Integer> stage2 = stage0.thenComposeAsync(precontextualizedFunction4);
CompletableFuture<Integer> stage3 = stage2.applyToEither(stage1, precontextualizedFunction2);
CompletableFuture<Integer> stage4 = stage3.thenApply(normalFunction);
CompletableFuture<Integer> stage5 = stage4.thenApply(i -> i / (i - 321)) // intentional ArithmeticException for division by 0
.exceptionally(precontextualizedErrorHandler);
stage0.complete(0);
Assert.assertEquals(stage2.join(), Integer.valueOf(1),
"Unexpected result for completion stage.");
Assert.assertEquals(stage5.join(), Integer.valueOf(-1),
"Unexpected result for completion stage.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public CompletableFuture<ArrayList<DiagnosticsNode>> getProperties(InspectorService.ObjectGroup objectGroup) {
final CompletableFuture<ArrayList<DiagnosticsNode>> properties = objectGroup.getProperties(getDartDiagnosticRef());
return properties.thenApplyAsync(this::trackPropertiesMatchingParameters);
}
public <T,U> CompletableFuture<U> thenApply
(CompletableFuture<T> f, Function<? super T,U> a) {
return f.thenApplyAsync(a, new ThreadExecutor());
}
/**
* This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to apply.
* @param executor the executor to run the apply function if the future is not yet done.
* @param applyFun the function to apply.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is applying the given function to the input future.
*/
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends OUT> applyFun) {
return completableFuture.isDone() ?
completableFuture.thenApply(applyFun) :
completableFuture.thenApplyAsync(applyFun, executor);
}
/**
* This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to apply.
* @param executor the executor to run the apply function if the future is not yet done.
* @param applyFun the function to apply.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is applying the given function to the input future.
*/
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends OUT> applyFun) {
return completableFuture.isDone() ?
completableFuture.thenApply(applyFun) :
completableFuture.thenApplyAsync(applyFun, executor);
}
public AsyncList(final CompletableFuture<Stream<CompletableFuture>> cf, final Executor service) {
// use elastic pool to execute asyn
async = cf.thenApplyAsync(st -> st.collect(Collectors.toList()), service);
this.service = service;
}