java.util.concurrent.CompletableFuture#thenApplyAsync ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#thenApplyAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Flink-CEPplus   文件: Dispatcher.java
private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
	final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);

	if (jobManagerRunnerFuture == null) {
		return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
	} else {
		final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
		return leaderGatewayFuture.thenApplyAsync(
			(JobMasterGateway jobMasterGateway) -> {
				// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
				if (jobManagerRunnerFutures.containsKey(jobId)) {
					return jobMasterGateway;
				} else {
					throw new CompletionException(new FlinkJobNotFoundException(jobId));
				}
			},
			getMainThreadExecutor());
	}
}
 
源代码2 项目: coroutines   文件: CoroutineStep.java
/***************************************
 * Runs this execution step asynchronously as a continuation of a previous
 * code execution in a {@link CompletableFuture} and proceeds to the next
 * step afterwards.
 *
 * <p>Subclasses that need to suspend the invocation of the next step until
 * some condition is met (e.g. sending or receiving data has finished) need
 * to override this method and create a {@link Suspension} by invoking
 * {@link Continuation#suspend(CoroutineStep, CoroutineStep)} on the next
 * step. If the condition that caused the suspension resolves the coroutine
 * execution can be resumed by calling {@link
 * Suspension#resume(Object)}.</p>
 *
 * <p>Subclasses that override this method also need to handle errors by
 * terminating any further execution (i.e. not resuming a suspension if such
 * exists) and forwarding the causing exception to {@link
 * Continuation#fail(Throwable)}.</p>
 *
 * @param fPreviousExecution The future of the previous code execution
 * @param rNextStep          The next step to execute or NULL for none
 * @param rContinuation      The continuation of the execution
 */
public void runAsync(CompletableFuture<I> fPreviousExecution,
					 CoroutineStep<O, ?>  rNextStep,
					 Continuation<?>	  rContinuation)
{
	CompletableFuture<O> fExecution =
		fPreviousExecution.thenApplyAsync(
			i -> execute(i, rContinuation),
			rContinuation);

	if (rNextStep != null)
	{
		// the next step is either a StepChain which contains it's own
		// next step or the final step in a coroutine and therefore the
		// rNextStep argument can be NULL
		rNextStep.runAsync(fExecution, null, rContinuation);
	}
	else
	{
		// only add exception handler to the end of a chain, i.e. next == null
		fExecution.exceptionally(e -> fail(e, rContinuation));
	}
}
 
源代码3 项目: quarkus   文件: ContextEndpoint.java
@Transactional
@GET
@Path("/transaction")
public CompletionStage<String> transactionTest() throws SystemException {
    CompletableFuture<String> ret = all.completedFuture("OK");

    ContextEntity entity = new ContextEntity();
    entity.name = "Stef";
    entity.persist();
    Transaction t1 = Panache.getTransactionManager().getTransaction();
    Assertions.assertNotNull(t1);

    return ret.thenApplyAsync(text -> {
        Assertions.assertEquals(1, ContextEntity.count());
        Transaction t2;
        try {
            t2 = Panache.getTransactionManager().getTransaction();
        } catch (SystemException e) {
            throw new RuntimeException(e);
        }
        Assertions.assertEquals(t1, t2);
        return text;
    });
}
 
源代码4 项目: cyclops   文件: SimpleReactStream.java
/**
 * React to the completion of any of the events in the previous stage. Will not work reliably with Streams
 * where filter has been applied in earlier stages. (As Filter completes the Stream for events that are filtered out, they
 * potentially shortcircuit the completion of the stage).
 *
 * @param fn Function to applyHKT when any of the previous events complete
 * @return Next stage in the stream
 */
default <R> SimpleReactStream<R> anyOf(final Function<? super U, ? extends R> fn) {
    final CompletableFuture[] array = lastActiveArray(getLastActive());
    final CompletableFuture cf = CompletableFuture.anyOf(array);
    final CompletableFuture onSuccess = cf.thenApplyAsync(fn, getTaskExecutor());

    return (SimpleReactStream<R>) withLastActive(new EagerStreamWrapper(
                                                                        onSuccess, getErrorHandler()));

}
 
源代码5 项目: future   文件: JavaAsyncFutureBenchmark.java
@Benchmark
public String mapPromise() throws InterruptedException, ExecutionException {
  CompletableFuture<String> p = new CompletableFuture<String>();
  CompletableFuture<String> f = p.thenApplyAsync(mapF);
  p.complete(string);
  return f.get();
}
 
源代码6 项目: future   文件: JavaAsyncFutureBenchmark.java
@Benchmark
public String mapPromiseN() throws InterruptedException, ExecutionException {
  CompletableFuture<String> p = new CompletableFuture<String>();
  CompletableFuture<String> f = p;
  for (int i = 0; i < N.n; i++)
    f = f.thenApplyAsync(mapF);
  p.complete(string);
  return f.get();
}
 
源代码7 项目: hellokoding-courses   文件: SupplyAsyncTest.java
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Future");

    completableFuture = completableFuture.thenApplyAsync((s) -> s.concat(" is awesome!"));
    CompletableFuture<Void> procedureFuture = completableFuture.thenAcceptAsync(System.out::println);

    assertThat(procedureFuture.get()).isNull();
}
 
源代码8 项目: future   文件: JavaAsyncFutureBenchmark.java
@Benchmark
public String setValueN() throws InterruptedException, ExecutionException {
  CompletableFuture<String> p = new CompletableFuture<>();
  CompletableFuture<String> f = p;
  for (int i = 0; i < N.n; i++)
    f = f.thenApplyAsync(mapF);
  p.complete(string);
  return f.get();
}
 
源代码9 项目: openjdk-jdk9   文件: Stream.java
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
    CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
    if(executor!=null && !cf.isDone()) {
        cf  = cf.thenApplyAsync( r -> r, executor);
    }
    return cf;
}
 
源代码10 项目: future   文件: JavaAsyncFutureBenchmark.java
@Benchmark
public String mapPromise() throws InterruptedException, ExecutionException {
  CompletableFuture<String> p = new CompletableFuture<String>();
  CompletableFuture<String> f = p.thenApplyAsync(mapF);
  p.complete(string);
  return f.get();
}
 
源代码11 项目: quarkus   文件: CustomContextTest.java
@Test
public void testCustomContextPropagation() throws Exception {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    // set something to custom context
    CustomContext.set("foo");

    CompletableFuture<String> ret = tc.withContextCapture(CompletableFuture.completedFuture("void"));
    CompletableFuture<Void> cfs = ret.thenApplyAsync(text -> {
        Assertions.assertEquals("foo", CustomContext.get());
        return null;
    }, executor);
    cfs.get();
}
 
源代码12 项目: future   文件: JavaAsyncFutureBenchmark.java
@Benchmark
public String setValueN() throws InterruptedException, ExecutionException {
  CompletableFuture<String> p = new CompletableFuture<>();
  CompletableFuture<String> f = p;
  for (int i = 0; i < N.n; i++)
    f = f.thenApplyAsync(mapF);
  p.complete(string);
  return f.get();
}
 
源代码13 项目: quarkus   文件: ContextEndpoint.java
@GET
@Path("/servlet-tc")
public CompletionStage<String> servletThreadContextTest(@Context UriInfo uriInfo) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> ret = allTc.withContextCapture(CompletableFuture.completedFuture("OK"));
    return ret.thenApplyAsync(text -> {
        servletRequest.getContentType();
        return text;
    }, executor);
}
 
源代码14 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> thenApply
    (CompletableFuture<T> f, Function<? super T,U> a) {
    return f.thenApplyAsync(a);
}
 
/**
 * When an already-contextualized Function is specified as the action/task,
 * the action/task runs with its already-captured context rather than
 * capturing and applying context per the configuration of the managed executor.
 *
 * @throws ExecutionException indicates test failure
 * @throws InterruptedException indicates test failure
 * @throws TimeoutException indicates test failure
 */
@Test
public void contextOfContextualFunctionOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
    ThreadContext labelContext = ThreadContext.builder()
            .propagated(Label.CONTEXT_NAME)
            .unchanged()
            .cleared(ThreadContext.ALL_REMAINING)
            .build();

    ManagedExecutor executor = ManagedExecutor.builder()
            .propagated(Buffer.CONTEXT_NAME)
            .cleared(ThreadContext.ALL_REMAINING)
            .build();
    try {
        Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-1"));
        Label.set("contextualFunctionOverride-label-1");

        Function<Integer, Integer> precontextualizedFunction1 = labelContext.contextualFunction(i -> {
            Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-1",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return i + 1;
        });

        Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-2"));
        Label.set("contextualFunctionOverride-label-2");

        Function<Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction(i -> {
            Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return i + 20;
        });

        Function<Throwable, Integer> precontextualizedErrorHandler = labelContext.contextualFunction(failure -> {
            Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-2",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return -1;
        });

        Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
        Label.set("contextualFunctionOverride-label-3");

        Function<Integer, Integer> normalFunction = i -> {
            Assert.assertEquals(Buffer.get().toString(), "contextualFunctionOverride-buffer-3",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Label.get(), "",
                    "Context type not cleared from thread.");
            return i + 300;
        };

        CompletableFuture<Integer> stage0 = executor.newIncompleteFuture();
        CompletableFuture<Integer> stage1 = stage0.thenApplyAsync(precontextualizedFunction1);

        Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-4"));
        Label.set("contextualFunctionOverride-label-4");

        Function<Integer, CompletableFuture<Integer>> precontextualizedFunction4 = labelContext.contextualFunction(i -> {
            Assert.assertEquals(Label.get(), "contextualFunctionOverride-label-4",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return stage1;
        });

        Buffer.set(new StringBuffer("contextualFunctionOverride-buffer-3"));
        Label.set("contextualFunctionOverride-label-3");

        CompletableFuture<Integer> stage2 = stage0.thenComposeAsync(precontextualizedFunction4);
        CompletableFuture<Integer> stage3 = stage2.applyToEither(stage1, precontextualizedFunction2);
        CompletableFuture<Integer> stage4 = stage3.thenApply(normalFunction);
        CompletableFuture<Integer> stage5 = stage4.thenApply(i -> i / (i - 321)) // intentional ArithmeticException for division by 0
                .exceptionally(precontextualizedErrorHandler);

        stage0.complete(0);

        Assert.assertEquals(stage2.join(), Integer.valueOf(1),
                "Unexpected result for completion stage.");

        Assert.assertEquals(stage5.join(), Integer.valueOf(-1),
                "Unexpected result for completion stage.");
    }
    finally {
        executor.shutdownNow();
        // Restore original values
        Buffer.set(null);
        Label.set(null);
    }
}
 
源代码16 项目: flutter-intellij   文件: DiagnosticsNode.java
public CompletableFuture<ArrayList<DiagnosticsNode>> getProperties(InspectorService.ObjectGroup objectGroup) {
  final CompletableFuture<ArrayList<DiagnosticsNode>> properties = objectGroup.getProperties(getDartDiagnosticRef());
  return properties.thenApplyAsync(this::trackPropertiesMatchingParameters);
}
 
源代码17 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> thenApply
    (CompletableFuture<T> f, Function<? super T,U> a) {
    return f.thenApplyAsync(a, new ThreadExecutor());
}
 
源代码18 项目: flink   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
 * is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
 * value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to apply.
 * @param executor the executor to run the apply function if the future is not yet done.
 * @param applyFun the function to apply.
 * @param <IN> type of the input future.
 * @param <OUT> type of the output future.
 * @return a completable future that is applying the given function to the input future.
 */
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	Function<? super IN, ? extends OUT> applyFun) {
	return completableFuture.isDone() ?
		completableFuture.thenApply(applyFun) :
		completableFuture.thenApplyAsync(applyFun, executor);
}
 
源代码19 项目: flink   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a function to apply to this future. If the input future
 * is already done, this function returns {@link CompletableFuture#thenApply(Function)}. Otherwise, the return
 * value is {@link CompletableFuture#thenApplyAsync(Function, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to apply.
 * @param executor the executor to run the apply function if the future is not yet done.
 * @param applyFun the function to apply.
 * @param <IN> type of the input future.
 * @param <OUT> type of the output future.
 * @return a completable future that is applying the given function to the input future.
 */
public static <IN, OUT> CompletableFuture<OUT> thenApplyAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	Function<? super IN, ? extends OUT> applyFun) {
	return completableFuture.isDone() ?
		completableFuture.thenApply(applyFun) :
		completableFuture.thenApplyAsync(applyFun, executor);
}
 
源代码20 项目: cyclops   文件: EagerStreamWrapper.java
public AsyncList(final CompletableFuture<Stream<CompletableFuture>> cf, final Executor service) {
    // use elastic pool to execute asyn

    async = cf.thenApplyAsync(st -> st.collect(Collectors.toList()), service);
    this.service = service;

}