下面列出了java.util.concurrent.CompletableFuture#thenCombine ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
return new ClusterOverview(resourceOverview, allJobsOverview);
});
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
return new ClusterOverview(resourceOverview, allJobsOverview);
});
}
CompletableFuture<DataResponse<T>> buildChain(CompletableFuture<DataResponse<T>> rootResult) {
Function<DataResponse<T>, CompletableFuture<U>> fetcherAsAsyncFunction = (r) -> {
return CompletableFuture.supplyAsync(() -> {
return fetcher.apply(r);
}, executor);
};
CompletableFuture<U> chainFuture = rootResult.thenCompose(fetcherAsAsyncFunction);
BiFunction<DataResponse<T>, U, DataResponse<T>> mergerAsFunction = (r, u) -> {
merger.accept(r, u);
return r;
};
return rootResult.thenCombine(chainFuture, mergerAsFunction);
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
CompletableFuture<Collection<JobStatus>> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
(Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> {
final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
return new ClusterOverview(resourceOverview, allJobsOverview);
});
}
private CompletableFuture<JobGraph> uploadJobGraphFiles(
DispatcherGateway gateway,
CompletableFuture<JobGraph> jobGraphFuture,
Collection<Path> jarFiles,
Collection<Tuple2<String, Path>> artifacts,
Configuration configuration) {
CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
try {
ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
} catch (FlinkException e) {
throw new CompletionException(new RestHandlerException(
"Could not upload job files.",
HttpResponseStatus.INTERNAL_SERVER_ERROR,
e));
}
return jobGraph;
});
}
@NotNull
private CompletableFuture<List<SageVariant>> combine(@NotNull final GenomeRegion region,
final CompletableFuture<List<Candidate>> candidates, final CompletableFuture<ReadContextCounters> doneTumor,
final CompletableFuture<ReadContextCounters> doneNormal) {
return doneNormal.thenCombine(doneTumor, (normalCandidates, tumorCandidates) -> {
LOGGER.debug("Gathering evidence in {}:{}", region.chromosome(), region.start());
final SageVariantFactory variantFactory = new SageVariantFactory(config.filter());
// Combine normal and tumor together and create variants
final List<SageVariant> result = Lists.newArrayList();
for (Candidate candidate : candidates.join()) {
final List<ReadContextCounter> normal = normalCandidates.readContextCounters(candidate.variant());
final List<ReadContextCounter> tumor = tumorCandidates.readContextCounters(candidate.variant());
SageVariant sageVariant = variantFactory.create(normal, tumor);
result.add(sageVariant);
}
return result;
});
}
private StreamSegmentsIterator listSegments(final Stream stream, final Optional<StreamCut> startStreamCut,
final Optional<StreamCut> endStreamCut) {
val startCut = startStreamCut.filter(sc -> !sc.equals(StreamCut.UNBOUNDED));
val endCut = endStreamCut.filter(sc -> !sc.equals(StreamCut.UNBOUNDED));
//Validate that the stream cuts are for the requested stream.
startCut.ifPresent(streamCut -> Preconditions.checkArgument(stream.equals(streamCut.asImpl().getStream())));
endCut.ifPresent(streamCut -> Preconditions.checkArgument(stream.equals(streamCut.asImpl().getStream())));
// if startStreamCut is not provided use the streamCut at the start of the stream.
// if toStreamCut is not provided obtain a streamCut at the tail of the stream.
CompletableFuture<StreamCut> startSCFuture = startCut.isPresent() ?
CompletableFuture.completedFuture(startCut.get()) : streamCutHelper.fetchHeadStreamCut(stream);
CompletableFuture<StreamCut> endSCFuture = endCut.isPresent() ?
CompletableFuture.completedFuture(endCut.get()) : streamCutHelper.fetchTailStreamCut(stream);
//fetch the StreamSegmentsInfo based on start and end streamCuts.
CompletableFuture<StreamSegmentsIterator> streamSegmentInfo = startSCFuture.thenCombine(endSCFuture,
(startSC, endSC) -> getStreamSegmentInfo(stream, startSC, endSC));
return getAndHandleExceptions(streamSegmentInfo, RuntimeException::new);
}
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection<File> uploadedFiles = request.getUploadedFiles();
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}
final JobSubmitRequestBody requestBody = request.getRequestBody();
if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection<File> uploadedFiles = request.getUploadedFiles();
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}
final JobSubmitRequestBody requestBody = request.getRequestBody();
if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
final Collection<File> uploadedFiles = request.getUploadedFiles();
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
File::getName,
Path::fromLocalFile
));
if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
nameToFile.size(),
uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST
);
}
final JobSubmitRequestBody requestBody = request.getRequestBody();
if (requestBody.jobGraphFileName == null) {
throw new RestHandlerException(
String.format("The %s field must not be omitted or be null.",
JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
return jobSubmissionFuture.thenCombine(jobGraphFuture,
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}
@Nonnull
private CompletableFuture<TupleRange> buildEndpoints(@Nonnull FDBRecordStore store, @Nonnull RangeSet rangeSet,
@Nullable AtomicLong recordsScanned) {
final ExecuteProperties limit1 = ExecuteProperties.newBuilder()
.setReturnedRowLimit(1)
.setIsolationLevel(IsolationLevel.SERIALIZABLE)
.build();
final ScanProperties forward = new ScanProperties(limit1);
RecordCursor<FDBStoredRecord<Message>> beginCursor = store.scanRecords(recordsRange, null, forward);
CompletableFuture<Tuple> begin = beginCursor.onNext().thenCompose(result -> {
if (result.hasNext()) {
Tuple firstTuple = result.get().getPrimaryKey();
return buildRange(store, null, firstTuple, recordsScanned).thenApply(vignore -> firstTuple);
} else {
// Empty range -- add the whole thing.
return rangeSet.insertRange(store.ensureContextActive(), null, null).thenApply(bignore -> null);
}
});
final ScanProperties backward = new ScanProperties(limit1, true);
RecordCursor<FDBStoredRecord<Message>> endCursor = store.scanRecords(recordsRange, null, backward);
CompletableFuture<Tuple> end = endCursor.onNext().thenCompose(result -> {
if (result.hasNext()) {
Tuple lastTuple = result.get().getPrimaryKey();
return buildRange(store, lastTuple, null, recordsScanned).thenApply(vignore -> lastTuple);
} else {
// As the range is empty, the whole range needs to be added, but that is accomplished
// by the above future, so this has nothing to do.
return CompletableFuture.completedFuture(null);
}
});
return begin.thenCombine(end, (firstTuple, lastTuple) -> {
if (firstTuple == null || firstTuple.equals(lastTuple)) {
return null;
} else {
return new TupleRange(firstTuple, lastTuple, EndpointType.RANGE_INCLUSIVE, EndpointType.RANGE_EXCLUSIVE);
}
});
}
/**
* Run the command, and process the output to arrays for later parsing and checking
*
* @param quiet true if the output should NOT be printed directly to System.out/System.err
*/
public Result run(boolean quiet) {
ProcessBuilder processBuilder = new ProcessBuilder(cmd);
processBuilder.environment().putAll(env);
final Result result = new Result();
System.out.println("Running:" + this.toString());
try {
Process process = processBuilder.start();
CompletableFuture<ArrayList<String>> soutFut = readOutStream(process.getInputStream(),quiet?null:System.out);
CompletableFuture<ArrayList<String>> serrFut = readOutStream(process.getErrorStream(),quiet?null:System.err);
CompletableFuture<Result> resultFut = soutFut.thenCombine(serrFut, (stdout, stderr) -> {
// print to current stderr the stderr of process and return the stdout
result.stderr = stderr;
result.stdout = stdout;
return result;
});
result.exitcode = process.waitFor();
// get stdout once ready, blocking
resultFut.get();
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
result.exitcode = -1;
}
return result;
}
private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
return future.thenCombine(result.future(), (d1, d2) -> {
if (d1 == null) {
return d2;
}
if (d1.getResultCode() != d2.getResultCode()) {
return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
}
return d1;
});
}
/**
* Fetch the {@link StreamInfo} for a given stream.
*
* @param stream The Stream.
* @return A future representing {@link StreamInfo}.
*/
private CompletableFuture<StreamInfo> getStreamInfo(final Stream stream) {
//Fetch the stream cut representing the current TAIL and current HEAD of the stream.
CompletableFuture<StreamCut> currentTailStreamCut = streamCutHelper.fetchTailStreamCut(stream);
CompletableFuture<StreamCut> currentHeadStreamCut = streamCutHelper.fetchHeadStreamCut(stream);
return currentTailStreamCut.thenCombine(currentHeadStreamCut,
(tailSC, headSC) -> {
boolean isSealed = ((StreamCutImpl) tailSC).getPositions().isEmpty();
return new StreamInfo(stream.getScope(),
stream.getStreamName(),
tailSC, headSC, isSealed);
});
}
CompletableFuture<DataResponse<T>> buildChain(CompletableFuture<DataResponse<T>> rootResult) {
CompletableFuture<U> chainFuture = CompletableFuture.supplyAsync(fetcher, executor);
BiFunction<DataResponse<T>, U, DataResponse<T>> mergerAsFunction = (r, u) -> {
merger.accept(r, u);
return r;
};
return rootResult.thenCombine(chainFuture, mergerAsFunction);
}
/**
* Gets the location preferences of the execution, as determined by the locations
* of the predecessors from which it receives input data.
* If there are more than {@link ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of source data,
* or neither the sources have not been started nor will be started with the execution together,
* this method returns an empty collection to indicate no location preference.
*
* @return The preferred locations based in input streams, or an empty iterable,
* if there is no input-based preference.
*/
@VisibleForTesting
static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(
ExecutionVertexID executionVertexId,
InputsLocationsRetriever inputsLocationsRetriever) {
CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =
CompletableFuture.completedFuture(Collections.emptyList());
Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = new ArrayList<>();
Collection<Collection<ExecutionVertexID>> allProducers =
inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
for (Collection<ExecutionVertexID> producers : allProducers) {
for (ExecutionVertexID producer : producers) {
Optional<CompletableFuture<TaskManagerLocation>> optionalLocationFuture =
inputsLocationsRetriever.getTaskManagerLocation(producer);
optionalLocationFuture.ifPresent(locationsFutures::add);
// If the parallelism is large, wait for all futures coming back may cost a long time.
if (locationsFutures.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
locationsFutures.clear();
break;
}
}
CompletableFuture<Collection<TaskManagerLocation>> uniqueLocationsFuture =
FutureUtils.combineAll(locationsFutures).thenApply(HashSet::new);
preferredLocations = preferredLocations.thenCombine(
uniqueLocationsFuture,
(locationsOnOneEdge, locationsOnAnotherEdge) -> {
if ((!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size())
|| locationsOnAnotherEdge.isEmpty()) {
return locationsOnOneEdge;
} else {
return locationsOnAnotherEdge;
}
});
locationsFutures.clear();
}
return preferredLocations;
}
/**
* Verify that the ThreadContext's contextualFunction
* method can be used to wrap a BiFunction instance with the context that is captured from the
* current thread per the configuration of the ThreadContext builder, and that the context is
* applied when the BiFunction apply method runs. This test case aligns with use case of
* supplying a contextual BiFunction to a completion stage that is otherwise not context-aware.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextualBiFunctionRunsWithContext()
throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
// Set non-default values
Buffer.get().append("contextualBiFunction-test-buffer");
Label.set("contextualBiFunction-test-label");
// CompletableFuture from Java SE is intentionally used here to avoid the context
// propagation guarantees of ManagedExecutor.
// This ensures we are testing that ThreadContext is
// doing the work to propagate the context rather than getting it from a
// ManagedExecutor.
CompletableFuture<Integer> stage1a = new CompletableFuture<Integer>();
CompletableFuture<Integer> stage1b = new CompletableFuture<Integer>();
CompletableFuture<Integer> stage2 = stage1a.thenCombine(stage1b,
labelContext.contextualFunction((a, b) -> {
Assert.assertEquals(a, Integer.valueOf(10),
"First value supplied to BiFunction was lost or altered.");
Assert.assertEquals(b, Integer.valueOf(20),
"Second value supplied to BiFunction was lost or altered.");
Assert.assertEquals(Label.get(), "contextualBiFunction-test-label",
"Context type was not propagated to contextual action.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type that is configured to be cleared was not cleared.");
return a + b;
}));
Future<Integer> future = unmanagedThreads.submit(() -> {
stage1a.complete(10);
stage1b.complete(20);
return stage2.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS);
});
Assert.assertEquals(future.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS), Integer.valueOf(30),
"Result of BiFunction was lost or altered.");
}
finally {
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
/**
* When an already-contextualized Consumer or BiFunction is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualConsumerAndBiFunctionOverrideContextOfManagedExecutor()
throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-1"));
Label.set("contextualBiFunctionOverride-label-1");
BiFunction<Integer, Throwable, Integer> precontextualizedFunction1 = labelContext.contextualFunction((result, failure) -> {
Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return failure == null ? result : 100;
});
Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-2"));
Label.set("contextualBiFunctionOverride-label-2");
BiFunction<Integer, Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction((i, j) -> {
Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i - j;
});
Buffer.set(new StringBuffer("contextualConsumerOverride-buffer-3"));
Label.set("contextualConsumerOverride-label-3");
Consumer<Integer> precontextualizedConsumer3 = labelContext.contextualConsumer(i -> {
Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualConsuemrOverride-buffer-4"));
Label.set("contextualConsumerOverride-label-4");
Consumer<Integer> precontextualizedConsumer4 = labelContext.contextualConsumer(i -> {
Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-4",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
BiFunction<Void, Void, String> normalFunction5 = (unused1, unused2) -> {
Assert.assertEquals(Buffer.get().toString(), "contextualConsumerAndBiFunctionOverride-buffer-5",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
return "done";
};
Buffer.set(new StringBuffer("contextualConsumerAndBiFunctionOverride-buffer-5"));
Label.set("contextualConsumerAndBiFunctionOverride-label-5");
CompletableFuture<Integer> stage0 = executor.failedFuture(new ArrayIndexOutOfBoundsException("Expected error."));
CompletableFuture<Integer> stage1 = stage0.handleAsync(precontextualizedFunction1);
CompletableFuture<Integer> stage2 = executor.completedFuture(200).thenCombineAsync(stage1, precontextualizedFunction2);
CompletableFuture<Void> stage3 = stage2.thenAccept(precontextualizedConsumer3);
CompletableFuture<Void> stage4 = stage2.acceptEitherAsync(stage1, precontextualizedConsumer4);
CompletableFuture<String> stage5 = stage4.thenCombine(stage3, normalFunction5);
Assert.assertEquals(stage5.join(), "done",
"Unexpected result for completion stage.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U,V> CompletableFuture<V> thenCombine
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiFunction<? super T,? super U,? extends V> a) {
return f.thenCombine(g, a);
}
public <T,U,V> CompletableFuture<V> thenCombine
(CompletableFuture<T> f,
CompletionStage<? extends U> g,
BiFunction<? super T,? super U,? extends V> a) {
return f.thenCombine(g, a);
}