java.util.concurrent.CompletableFuture#thenCombine ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#thenCombine ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Flink-CEPplus   文件: Dispatcher.java
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
	CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));

	final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));

	CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);

	CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);

	final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();

	return allJobsFuture.thenCombine(
		taskManagerOverviewFuture,
		(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
			final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
			return new ClusterOverview(resourceOverview, allJobsOverview);
		});
}
 
源代码2 项目: flink   文件: Dispatcher.java
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
	CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));

	final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));

	CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);

	CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);

	final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();

	return allJobsFuture.thenCombine(
		taskManagerOverviewFuture,
		(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
			final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
			return new ClusterOverview(resourceOverview, allJobsOverview);
		});
}
 
源代码3 项目: agrest   文件: MultiSelectBuilder.java
CompletableFuture<DataResponse<T>> buildChain(CompletableFuture<DataResponse<T>> rootResult) {

            Function<DataResponse<T>, CompletableFuture<U>> fetcherAsAsyncFunction = (r) -> {
                return CompletableFuture.supplyAsync(() -> {
                    return fetcher.apply(r);
                }, executor);
            };

            CompletableFuture<U> chainFuture = rootResult.thenCompose(fetcherAsAsyncFunction);

            BiFunction<DataResponse<T>, U, DataResponse<T>> mergerAsFunction = (r, u) -> {
                merger.accept(r, u);
                return r;
            };

            return rootResult.thenCombine(chainFuture, mergerAsFunction);
        }
 
源代码4 项目: flink   文件: Dispatcher.java
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
	CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));

	final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
		(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));

	CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);

	CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);

	final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();

	return allJobsFuture.thenCombine(
		taskManagerOverviewFuture,
		(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
			final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
			return new ClusterOverview(resourceOverview, allJobsOverview);
		});
}
 
源代码5 项目: flink   文件: JobSubmitHandler.java
private CompletableFuture<JobGraph> uploadJobGraphFiles(
		DispatcherGateway gateway,
		CompletableFuture<JobGraph> jobGraphFuture,
		Collection<Path> jarFiles,
		Collection<Tuple2<String, Path>> artifacts,
		Configuration configuration) {
	CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);

	return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
		final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
		try {
			ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
		} catch (FlinkException e) {
			throw new CompletionException(new RestHandlerException(
				"Could not upload job files.",
				HttpResponseStatus.INTERNAL_SERVER_ERROR,
				e));
		}
		return jobGraph;
	});
}
 
源代码6 项目: hmftools   文件: SomaticPipeline.java
@NotNull
private CompletableFuture<List<SageVariant>> combine(@NotNull final GenomeRegion region,
        final CompletableFuture<List<Candidate>> candidates, final CompletableFuture<ReadContextCounters> doneTumor,
        final CompletableFuture<ReadContextCounters> doneNormal) {
    return doneNormal.thenCombine(doneTumor, (normalCandidates, tumorCandidates) -> {
        LOGGER.debug("Gathering evidence in {}:{}", region.chromosome(), region.start());
        final SageVariantFactory variantFactory = new SageVariantFactory(config.filter());

        // Combine normal and tumor together and create variants
        final List<SageVariant> result = Lists.newArrayList();
        for (Candidate candidate : candidates.join()) {
            final List<ReadContextCounter> normal = normalCandidates.readContextCounters(candidate.variant());
            final List<ReadContextCounter> tumor = tumorCandidates.readContextCounters(candidate.variant());
            SageVariant sageVariant = variantFactory.create(normal, tumor);
            result.add(sageVariant);
        }

        return result;
    });
}
 
源代码7 项目: pravega   文件: BatchClientFactoryImpl.java
private StreamSegmentsIterator listSegments(final Stream stream, final Optional<StreamCut> startStreamCut,
                                            final Optional<StreamCut> endStreamCut) {
    val startCut = startStreamCut.filter(sc -> !sc.equals(StreamCut.UNBOUNDED));
    val endCut = endStreamCut.filter(sc -> !sc.equals(StreamCut.UNBOUNDED));

    //Validate that the stream cuts are for the requested stream.
    startCut.ifPresent(streamCut -> Preconditions.checkArgument(stream.equals(streamCut.asImpl().getStream())));
    endCut.ifPresent(streamCut -> Preconditions.checkArgument(stream.equals(streamCut.asImpl().getStream())));

    // if startStreamCut is not provided use the streamCut at the start of the stream.
    // if toStreamCut is not provided obtain a streamCut at the tail of the stream.
    CompletableFuture<StreamCut> startSCFuture = startCut.isPresent() ?
            CompletableFuture.completedFuture(startCut.get()) : streamCutHelper.fetchHeadStreamCut(stream);
    CompletableFuture<StreamCut> endSCFuture = endCut.isPresent() ?
            CompletableFuture.completedFuture(endCut.get()) : streamCutHelper.fetchTailStreamCut(stream);

    //fetch the StreamSegmentsInfo based on start and end streamCuts.
    CompletableFuture<StreamSegmentsIterator> streamSegmentInfo = startSCFuture.thenCombine(endSCFuture,
            (startSC, endSC) -> getStreamSegmentInfo(stream, startSC, endSC));
    return getAndHandleExceptions(streamSegmentInfo, RuntimeException::new);
}
 
源代码8 项目: flink   文件: JobSubmitHandler.java
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final Collection<File> uploadedFiles = request.getUploadedFiles();
	final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
		File::getName,
		Path::fromLocalFile
	));

	if (uploadedFiles.size() != nameToFile.size()) {
		throw new RestHandlerException(
			String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
				uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
				nameToFile.size(),
				uploadedFiles.size()),
			HttpResponseStatus.BAD_REQUEST
		);
	}

	final JobSubmitRequestBody requestBody = request.getRequestBody();

	if (requestBody.jobGraphFileName == null) {
		throw new RestHandlerException(
			String.format("The %s field must not be omitted or be null.",
				JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
			HttpResponseStatus.BAD_REQUEST);
	}

	CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

	Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

	Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

	CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

	CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

	return jobSubmissionFuture.thenCombine(jobGraphFuture,
		(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
 
源代码9 项目: Flink-CEPplus   文件: JobSubmitHandler.java
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final Collection<File> uploadedFiles = request.getUploadedFiles();
	final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
		File::getName,
		Path::fromLocalFile
	));

	if (uploadedFiles.size() != nameToFile.size()) {
		throw new RestHandlerException(
			String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
				uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
				nameToFile.size(),
				uploadedFiles.size()),
			HttpResponseStatus.BAD_REQUEST
		);
	}

	final JobSubmitRequestBody requestBody = request.getRequestBody();

	if (requestBody.jobGraphFileName == null) {
		throw new RestHandlerException(
			String.format("The %s field must not be omitted or be null.",
				JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
			HttpResponseStatus.BAD_REQUEST);
	}

	CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

	Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

	Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

	CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

	CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

	return jobSubmissionFuture.thenCombine(jobGraphFuture,
		(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
 
源代码10 项目: flink   文件: JobSubmitHandler.java
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
	final Collection<File> uploadedFiles = request.getUploadedFiles();
	final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
		File::getName,
		Path::fromLocalFile
	));

	if (uploadedFiles.size() != nameToFile.size()) {
		throw new RestHandlerException(
			String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
				uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
				nameToFile.size(),
				uploadedFiles.size()),
			HttpResponseStatus.BAD_REQUEST
		);
	}

	final JobSubmitRequestBody requestBody = request.getRequestBody();

	if (requestBody.jobGraphFileName == null) {
		throw new RestHandlerException(
			String.format("The %s field must not be omitted or be null.",
				JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
			HttpResponseStatus.BAD_REQUEST);
	}

	CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

	Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

	Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

	CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

	CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

	return jobSubmissionFuture.thenCombine(jobGraphFuture,
		(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
 
源代码11 项目: fdb-record-layer   文件: OnlineIndexer.java
@Nonnull
private CompletableFuture<TupleRange> buildEndpoints(@Nonnull FDBRecordStore store, @Nonnull RangeSet rangeSet,
                                                     @Nullable AtomicLong recordsScanned) {
    final ExecuteProperties limit1 = ExecuteProperties.newBuilder()
            .setReturnedRowLimit(1)
            .setIsolationLevel(IsolationLevel.SERIALIZABLE)
            .build();
    final ScanProperties forward = new ScanProperties(limit1);
    RecordCursor<FDBStoredRecord<Message>> beginCursor = store.scanRecords(recordsRange, null, forward);
    CompletableFuture<Tuple> begin = beginCursor.onNext().thenCompose(result -> {
        if (result.hasNext()) {
            Tuple firstTuple = result.get().getPrimaryKey();
            return buildRange(store, null, firstTuple, recordsScanned).thenApply(vignore -> firstTuple);
        } else {
            // Empty range -- add the whole thing.
            return rangeSet.insertRange(store.ensureContextActive(), null, null).thenApply(bignore -> null);
        }
    });

    final ScanProperties backward = new ScanProperties(limit1, true);
    RecordCursor<FDBStoredRecord<Message>> endCursor = store.scanRecords(recordsRange, null, backward);
    CompletableFuture<Tuple> end = endCursor.onNext().thenCompose(result -> {
        if (result.hasNext()) {
            Tuple lastTuple = result.get().getPrimaryKey();
            return buildRange(store, lastTuple, null, recordsScanned).thenApply(vignore -> lastTuple);
        } else {
            // As the range is empty, the whole range needs to be added, but that is accomplished
            // by the above future, so this has nothing to do.
            return CompletableFuture.completedFuture(null);
        }
    });

    return begin.thenCombine(end, (firstTuple, lastTuple) -> {
        if (firstTuple == null || firstTuple.equals(lastTuple)) {
            return null;
        } else {
            return new TupleRange(firstTuple, lastTuple, EndpointType.RANGE_INCLUSIVE, EndpointType.RANGE_EXCLUSIVE);
        }
    });
}
 
源代码12 项目: fabric-chaincode-java   文件: Command.java
/**
 * Run the command, and process the output to arrays for later parsing and checking
 * 
 * @param quiet true if the output should NOT be printed directly to System.out/System.err
 */
public Result run(boolean quiet) {

    ProcessBuilder processBuilder = new ProcessBuilder(cmd);
    processBuilder.environment().putAll(env);
    final Result result = new Result();

    System.out.println("Running:" + this.toString());
    try {
        Process process = processBuilder.start();

        CompletableFuture<ArrayList<String>> soutFut = readOutStream(process.getInputStream(),quiet?null:System.out);
        CompletableFuture<ArrayList<String>> serrFut = readOutStream(process.getErrorStream(),quiet?null:System.err);

        CompletableFuture<Result> resultFut = soutFut.thenCombine(serrFut, (stdout, stderr) -> {
             // print to current stderr the stderr of process and return the stdout
            result.stderr = stderr;
            result.stdout = stdout;
            return result;
         });

        result.exitcode = process.waitFor();
        // get stdout once ready, blocking
        resultFut.get();

    } catch (IOException | InterruptedException | ExecutionException e) {
        e.printStackTrace();
        result.exitcode = -1;
    }

    return result;
}
 
源代码13 项目: jetcache   文件: MultiLevelCache.java
private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
    return future.thenCombine(result.future(), (d1, d2) -> {
        if (d1 == null) {
            return d2;
        }
        if (d1.getResultCode() != d2.getResultCode()) {
            return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
        }
        return d1;
    });
}
 
源代码14 项目: pravega   文件: StreamManagerImpl.java
/**
 * Fetch the {@link StreamInfo} for a given stream.
 *
 * @param stream The Stream.
 * @return A future representing {@link StreamInfo}.
 */
private CompletableFuture<StreamInfo> getStreamInfo(final Stream stream) {
    //Fetch the stream cut representing the current TAIL and current HEAD of the stream.
    CompletableFuture<StreamCut> currentTailStreamCut = streamCutHelper.fetchTailStreamCut(stream);
    CompletableFuture<StreamCut> currentHeadStreamCut = streamCutHelper.fetchHeadStreamCut(stream);
    return currentTailStreamCut.thenCombine(currentHeadStreamCut,
                                            (tailSC, headSC) -> {
                                                boolean isSealed = ((StreamCutImpl) tailSC).getPositions().isEmpty();
                                                return new StreamInfo(stream.getScope(),
                                                        stream.getStreamName(),
                                                        tailSC, headSC, isSealed);
                                            });
}
 
源代码15 项目: agrest   文件: MultiSelectBuilder.java
CompletableFuture<DataResponse<T>> buildChain(CompletableFuture<DataResponse<T>> rootResult) {
    CompletableFuture<U> chainFuture = CompletableFuture.supplyAsync(fetcher, executor);

    BiFunction<DataResponse<T>, U, DataResponse<T>> mergerAsFunction = (r, u) -> {
        merger.accept(r, u);
        return r;
    };

    return rootResult.thenCombine(chainFuture, mergerAsFunction);
}
 
源代码16 项目: flink   文件: DefaultExecutionSlotAllocator.java
/**
 * Gets the location preferences of the execution, as determined by the locations
 * of the predecessors from which it receives input data.
 * If there are more than {@link ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of source data,
 * or neither the sources have not been started nor will be started with the execution together,
 * this method returns an empty collection to indicate no location preference.
 *
 * @return The preferred locations based in input streams, or an empty iterable,
 *         if there is no input-based preference.
 */
@VisibleForTesting
static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(
		ExecutionVertexID executionVertexId,
		InputsLocationsRetriever inputsLocationsRetriever) {
	CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =
			CompletableFuture.completedFuture(Collections.emptyList());

	Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = new ArrayList<>();

	Collection<Collection<ExecutionVertexID>> allProducers =
			inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
	for (Collection<ExecutionVertexID> producers : allProducers) {

		for (ExecutionVertexID producer : producers) {
			Optional<CompletableFuture<TaskManagerLocation>> optionalLocationFuture =
					inputsLocationsRetriever.getTaskManagerLocation(producer);
			optionalLocationFuture.ifPresent(locationsFutures::add);
			// If the parallelism is large, wait for all futures coming back may cost a long time.
			if (locationsFutures.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
				locationsFutures.clear();
				break;
			}
		}

		CompletableFuture<Collection<TaskManagerLocation>> uniqueLocationsFuture =
				FutureUtils.combineAll(locationsFutures).thenApply(HashSet::new);
		preferredLocations = preferredLocations.thenCombine(
				uniqueLocationsFuture,
				(locationsOnOneEdge, locationsOnAnotherEdge) -> {
					if ((!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size())
							|| locationsOnAnotherEdge.isEmpty()) {
						return locationsOnOneEdge;
					} else {
						return locationsOnAnotherEdge;
					}
				});
		locationsFutures.clear();
	}
	return preferredLocations;
}
 
/**
 * Verify that the ThreadContext's contextualFunction
 * method can be used to wrap a BiFunction instance with the context that is captured from the
 * current thread per the configuration of the ThreadContext builder, and that the context is
 * applied when the BiFunction apply method runs. This test case aligns with use case of
 * supplying a contextual BiFunction to a completion stage that is otherwise not context-aware.
 *
 * @throws ExecutionException indicates test failure
 * @throws InterruptedException indicates test failure
 * @throws TimeoutException indicates test failure
 */
@Test
public void contextualBiFunctionRunsWithContext()
        throws ExecutionException, InterruptedException, TimeoutException {
    ThreadContext labelContext = ThreadContext.builder()
            .propagated(Label.CONTEXT_NAME)
            .unchanged()
            .cleared(ThreadContext.ALL_REMAINING)
            .build();

    try {
        // Set non-default values
        Buffer.get().append("contextualBiFunction-test-buffer");
        Label.set("contextualBiFunction-test-label");

        // CompletableFuture from Java SE is intentionally used here to avoid the context
        // propagation guarantees of ManagedExecutor.
        // This ensures we are testing that ThreadContext is
        // doing the work to propagate the context rather than getting it from a
        // ManagedExecutor.
        CompletableFuture<Integer> stage1a = new CompletableFuture<Integer>();
        CompletableFuture<Integer> stage1b = new CompletableFuture<Integer>();

        CompletableFuture<Integer> stage2 = stage1a.thenCombine(stage1b,
                labelContext.contextualFunction((a, b) -> {
                    Assert.assertEquals(a, Integer.valueOf(10),
                            "First value supplied to BiFunction was lost or altered.");

                    Assert.assertEquals(b, Integer.valueOf(20),
                            "Second value supplied to BiFunction was lost or altered.");

                    Assert.assertEquals(Label.get(), "contextualBiFunction-test-label",
                            "Context type was not propagated to contextual action.");

                    Assert.assertEquals(Buffer.get().toString(), "",
                            "Context type that is configured to be cleared was not cleared.");

                    return a + b;
                }));

        Future<Integer> future = unmanagedThreads.submit(() -> {
            stage1a.complete(10);
            stage1b.complete(20);
            return stage2.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS);
        });

        Assert.assertEquals(future.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS), Integer.valueOf(30),
                "Result of BiFunction was lost or altered.");
    }
    finally {
        // Restore original values
        Buffer.set(null);
        Label.set(null);
    }
}
 
/**
 * When an already-contextualized Consumer or BiFunction is specified as the action/task,
 * the action/task runs with its already-captured context rather than
 * capturing and applying context per the configuration of the managed executor.
 *
 * @throws ExecutionException indicates test failure
 * @throws InterruptedException indicates test failure
 * @throws TimeoutException indicates test failure
 */
@Test
public void contextOfContextualConsumerAndBiFunctionOverrideContextOfManagedExecutor()
        throws ExecutionException, InterruptedException, TimeoutException {
    ThreadContext labelContext = ThreadContext.builder()
            .propagated(Label.CONTEXT_NAME)
            .unchanged()
            .cleared(ThreadContext.ALL_REMAINING)
            .build();

    ManagedExecutor executor = ManagedExecutor.builder()
            .propagated(Buffer.CONTEXT_NAME)
            .cleared(ThreadContext.ALL_REMAINING)
            .build();
    try {
        Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-1"));
        Label.set("contextualBiFunctionOverride-label-1");

        BiFunction<Integer, Throwable, Integer> precontextualizedFunction1 = labelContext.contextualFunction((result, failure) -> {
            Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-1",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return failure == null ? result : 100;
        });

        Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-2"));
        Label.set("contextualBiFunctionOverride-label-2");

        BiFunction<Integer, Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction((i, j) -> {
            Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-2",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return i - j;
        });

        Buffer.set(new StringBuffer("contextualConsumerOverride-buffer-3"));
        Label.set("contextualConsumerOverride-label-3");

        Consumer<Integer> precontextualizedConsumer3 = labelContext.contextualConsumer(i -> {
            Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-3",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        Buffer.set(new StringBuffer("contextualConsuemrOverride-buffer-4"));
        Label.set("contextualConsumerOverride-label-4");

        Consumer<Integer> precontextualizedConsumer4 = labelContext.contextualConsumer(i -> {
            Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-4",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        BiFunction<Void, Void, String> normalFunction5 = (unused1, unused2) -> {
            Assert.assertEquals(Buffer.get().toString(), "contextualConsumerAndBiFunctionOverride-buffer-5",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Label.get(), "",
                    "Context type not cleared from thread.");
            return "done";
        };

        Buffer.set(new StringBuffer("contextualConsumerAndBiFunctionOverride-buffer-5"));
        Label.set("contextualConsumerAndBiFunctionOverride-label-5");

        CompletableFuture<Integer> stage0 = executor.failedFuture(new ArrayIndexOutOfBoundsException("Expected error."));
        CompletableFuture<Integer> stage1 = stage0.handleAsync(precontextualizedFunction1);
        CompletableFuture<Integer> stage2 = executor.completedFuture(200).thenCombineAsync(stage1, precontextualizedFunction2);
        CompletableFuture<Void> stage3 = stage2.thenAccept(precontextualizedConsumer3);
        CompletableFuture<Void> stage4 = stage2.acceptEitherAsync(stage1, precontextualizedConsumer4);
        CompletableFuture<String> stage5 = stage4.thenCombine(stage3, normalFunction5);

        Assert.assertEquals(stage5.join(), "done",
                "Unexpected result for completion stage.");
    }
    finally {
        executor.shutdownNow();
        // Restore original values
        Buffer.set(null);
        Label.set(null);
    }
}
 
源代码19 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U,V> CompletableFuture<V> thenCombine
    (CompletableFuture<T> f,
     CompletionStage<? extends U> g,
     BiFunction<? super T,? super U,? extends V> a) {
    return f.thenCombine(g, a);
}
 
源代码20 项目: j2objc   文件: CompletableFutureTest.java
public <T,U,V> CompletableFuture<V> thenCombine
    (CompletableFuture<T> f,
     CompletionStage<? extends U> g,
     BiFunction<? super T,? super U,? extends V> a) {
    return f.thenCombine(g, a);
}