下面列出了java.util.concurrent.CompletableFuture#thenAccept ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Tests ordered failure of future callbacks.
*/
public void testOrderedFailure() throws Throwable {
CompletableFuture<String> future = new OrderedFuture<>();
AtomicInteger order = new AtomicInteger();
future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
future.handle((r, e) -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.thenRun(() -> fail());
future.thenAccept(r -> fail());
future.exceptionally(e -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.completeExceptionally(new RuntimeException("foo"));
}
public static void main(String [] args) throws Exception{
HttpClient client = HttpClient
.newBuilder()
.build();
HttpRequest request = HttpRequest
.newBuilder(new URI("http://httpbin.org/get"))
.GET()
.version(HttpClient.Version.HTTP_1_1)
.build();
CompletableFuture<HttpResponse<String>> responseFuture =
client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
CompletableFuture<Void> processedFuture =
responseFuture.thenAccept(response -> {
System.out.println("Status code: " + response.statusCode());
System.out.println("Response Body: " + response.body());
});
//wait for the CompleteableFuture to complete
CompletableFuture.allOf(processedFuture).join();
}
CompletableFuture<U> multiResponseAsync() {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
CompletableFuture<HttpResponse<T>> mainResponse =
cf.thenApply((HttpResponseImpl<T> b) -> {
multiResponseHandler.onResponse(b);
return (HttpResponse<T>)b;
});
pushGroup.setMainResponse(mainResponse);
// set up house-keeping related to multi-response
mainResponse.thenAccept((r) -> {
// All push promises received by now.
pushGroup.noMorePushes(true);
});
CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
start.completeAsync( () -> null, executor); // trigger execution
return res;
}
@Test
public void givenProcess_whenAddExitCallback_thenSuccess() throws Exception {
String javaCmd = ProcessUtils.getJavaCmd()
.getAbsolutePath();
ProcessBuilder processBuilder
= new ProcessBuilder(javaCmd, "-version");
Process process = processBuilder.inheritIO()
.start();
ProcessHandle processHandle = process.toHandle();
log.info("PID: {} has started", processHandle.pid());
CompletableFuture<ProcessHandle> onProcessExit = processHandle.onExit();
onProcessExit.get();
assertEquals(false, processHandle.isAlive());
onProcessExit.thenAccept(ph -> {
log.info("PID: {} has stopped", ph.pid());
});
}
/**
* Helper method to translate Player UUIDs to names.
*
* @param results List of results
* @param uuidsPendingLookup Lists of UUIDs pending lookup
* @return CompletableFuture
*/
public static CompletableFuture<List<Result>> translateUuidsToNames(List<Result> results, List<UUID> uuidsPendingLookup) {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
CompletableFuture<Collection<GameProfile>> futures = Sponge.getServer().getGameProfileManager().getAllById(uuidsPendingLookup, true);
futures.thenAccept((profiles) -> {
for (GameProfile profile : profiles) {
for (Result r : results) {
Optional<Object> cause = r.data.get(DataQueries.Cause);
if (cause.isPresent() && cause.get().equals(profile.getUniqueId().toString())) {
r.data.set(DataQueries.Cause, profile.getName().orElse("unknown"));
}
}
}
future.complete(results);
});
return future;
}
/**
* Builds a {@link CompletableFuture} suitable for use as a completion callback. When completed, each of the
* provided {@link Pending}'s {@link CompletableFuture} will be completed.
* <p>
* It is assumed that the size of the list this future is completed with matches the number of provided {@link
* Pending}s. A {@link RuntimeException} will be thrown otherwise.
*
* @param pending A list of {@link Pending} operations.
* @param <I> Input parameter of {@link Pending} operations.
* @param <O> Output parameter of {@link Pending} operations.
* @return A {@link CompletableFuture} that, when completed, then completes each of the given {@link Pending}'s
* {@link CompletableFuture}.
*/
public static <I, O> CompletableFuture<List<O>> callback(List<? extends Pending<I, O>> pending) {
CompletableFuture<List<O>> future = new CompletableFuture<>();
future.thenAccept(results -> {
if (results.size() != pending.size()) {
String message = String.format("result size (%s) does not match pending size (%s)", results.size(), pending.size());
throw new RuntimeException(message);
}
Iterator<? extends Pending<I, O>> pi = pending.iterator();
Iterator<O> ri = results.iterator();
while (pi.hasNext() && ri.hasNext()) {
pi.next().getFuture().complete(ri.next());
}
});
return future;
}
private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
List<CompletableFuture<PartitionRuntimeInformation>> futureList = new ArrayList<>();
for (String partition : partitionIds) {
CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
.getEventHubClient()
.getPartitionRuntimeInformation(partition);
futureList.add(partitionRuntimeInfo);
partitionRuntimeInfo.thenAccept(ehPartitionInfo -> {
LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
// Set offsets
String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
upcomingOffset);
sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
});
}
CompletableFuture<Void> futureGroup =
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
try {
futureGroup.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
String msg = String.format(
"Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s",
systemName, streamName);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
return sspMetadataMap;
}
@Override
void processClientRequest(RaftClientRequest request) {
final CompletableFuture<Void> f = processClientRequest(request, reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed " + request + ", reply=" + reply);
}
final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
});
final long callId = request.getCallId();
put(callId, f);
f.thenAccept(dummy -> remove(callId));
}
/**
* Returns a new {@link AsyncIterator} that wraps this instance which serializes the execution of all calls to
* {@link #getNext()} (no two executions of {@link #getNext()} will ever execute at the same time; they will be
* run in the order of invocation, but only after the previous one completes).
*
* @param executor An Executor to run async tasks on.
* @return A new {@link AsyncIterator}.
*/
default AsyncIterator<T> asSequential(Executor executor) {
SequentialProcessor processor = new SequentialProcessor(executor);
return () -> {
CompletableFuture<T> result = processor.add(AsyncIterator.this::getNext);
result.thenAccept(r -> {
if (r == null) {
processor.close();
}
});
return result;
};
}
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
return blobServerAddressFuture.thenAccept(blobServerAddress -> {
try {
ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
} catch (FlinkException e) {
throw new CompletionException(e);
}
});
}
@Override
protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
CompletableFuture<P> response;
try {
response = handleRequest(handlerRequest, gateway);
} catch (RestHandlerException e) {
response = FutureUtils.completedExceptionally(e);
}
return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}
private void removeBlob(RemovalNotification<ResourceID, CompletableFuture<TransientBlobKey>> removalNotification) {
log.debug("Remove cached file for TaskExecutor {}.", removalNotification.getKey());
final CompletableFuture<TransientBlobKey> value = removalNotification.getValue();
if (value != null) {
value.thenAccept(transientBlobService::deleteFromCache);
}
}
@Test
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));
future.get();
}
@Override
public Collection<CompletableFuture<? extends Collection<? extends MetricGroup>>> getGroups(Executor threadpool, CompletableFuture<TimeoutObject> timeout) {
/*
* Force the connection to open, even if there are no groups to scan.
*
* If there are no groups registered, the connection opening won't be triggered otherwise.
* And if that isn't triggered, registering groups won't happen either.
*/
CompletableFuture<List<MetricGroup>> future = connection.getConnection(threadpool)
.applyToEitherAsync(
timeout.thenApply(timeoutObject -> new GCCloseable<JMXConnector>()),
conn -> {
if (conn.get() == null)
throw new RuntimeException("connection unavailable");
synchronized (this) {
try {
List<MetricGroup> result = new ArrayList<>();
for (MBeanGroup group
: detected_groups_.values()) {
if (timeout.isDone())
throw new RuntimeException("timed out");
group.getMetrics(conn.get().getMBeanServerConnection()).ifPresent(result::add);
}
return result;
} catch (IOException ex) {
throw new RuntimeException("unable to get MBeanServerConnection", ex);
}
}
},
threadpool);
timeout.thenAccept(timeoutObject -> future.cancel(true));
return singleton(future);
}
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
return blobServerAddressFuture.thenAccept(blobServerAddress -> {
try {
ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
} catch (FlinkException e) {
throw new CompletionException(e);
}
});
}
private void doFullRegisterOrderAndRemove() throws InterruptedException, ExecutionException {
SpanBuilder spanBuilder = getTracer().buildSpan("fullSystemTest");
final Span span = spanBuilder.start();
try {
SpanContext parentContext = span.context();
// Register
CompletableFuture<Customer> newCustomer = CompletableFuture
.supplyAsync(() -> registerRandomCustomer(parentContext));
// Maybe not get the types and colors over and over. Looks pretty in the traces though...
CompletableFuture<RobotType[]> availableTypes = CompletableFuture
.supplyAsync(() -> getAllTypes(parentContext));
CompletableFuture<Color[]> availableColors = CompletableFuture
.supplyAsync(() -> getAllColors(parentContext));
CompletableFuture.allOf(newCustomer, availableTypes, availableColors);
Customer customer = newCustomer.get();
// First completion stage done. Now we can create the order
List<RobotOrderLineItem> lineItems = createRandomOrder(availableTypes.get(), availableColors.get());
CompletableFuture<RobotOrder> robotOrderCompletable = CompletableFuture
.supplyAsync(() -> postOrder(customer, lineItems, parentContext));
// Rest will happen asynchrously when data is available...
CompletableFuture<RealizedOrder> realizedOrderFuture = new CompletableFuture<RealizedOrder>();
// When we have the order, we schedule the polling for an available order...
robotOrderCompletable
.thenAccept((order) -> awaitOrderCompletion(order, realizedOrderFuture, parentContext));
// Once the order is realized, we will remove the customer.
realizedOrderFuture.thenApply((realizedOrder) -> removeOwner(realizedOrder, parentContext))
.thenAccept((customerId) -> span.finish());
} catch (Throwable t) {
span.log(OpenTracingUtil.getSpanLogMap(t));
throw t;
}
}
/**
* Executes a non blocking call to the GraphQL query processor and executes the query.
*
* @param graphqlQuery the graphql query
* @param variables the variables to pass to the query
* @param resultHandler vertx result handler
*/
public void queryNonBlocking(String graphqlQuery, JsonObject variables, Handler<AsyncResult<QueryResult>> resultHandler) {
Objects.requireNonNull(graphqlQuery, "GraphQL query cannot be null");
GraphQL graphQL = new GraphQL.Builder(schema()).build();
ExecutionInput.Builder asyncExecBuilder = ExecutionInput.newExecutionInput().query(graphqlQuery);
if (variables != null) {
asyncExecBuilder.variables(variables.getMap());
}
CompletableFuture<ExecutionResult> promise = graphQL.executeAsync(asyncExecBuilder.build());
promise.thenAccept(new Consumer<ExecutionResult>() {
@Override
public void accept(ExecutionResult result) {
try {
QueryResult queryResult = convertToQueryResult(result);
resultHandler.handle(Future.succeededFuture(queryResult));
} catch (Exception e) {
resultHandler.handle(Future.failedFuture(e));
}
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
CompletableFuture<Protocol> protocolFuture = ctx.channel().attr(ChannelAttributeKey.PROTOCOL_FUTURE).get();
Validate.validState(protocolFuture != null, "Protocol future must be initialized before handler is added.");
protocolFuture.thenAccept(p -> start(p, ctx));
}
@Test
public void testInOut() throws Exception {
AtomicBoolean headersReceived = new AtomicBoolean();
AtomicBoolean responseReceived = new AtomicBoolean();
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:input")
.inOut("direct:code-generator")
.process()
.body((o, h) -> {
assertThat(h).contains(entry("foo", "bar"));
headersReceived.set(true);
});
}
});
bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:code-generator").toVertx("code-generator")));
vertx.eventBus().consumer("code-generator", msg -> {
assertThat(msg.headers().names()).hasSize(2)
.contains("headerA", "headerB");
msg.reply("OK !", new DeliveryOptions().addHeader("foo", "bar"));
});
camel.start();
BridgeHelper.startBlocking(bridge);
ProducerTemplate producer = camel.createProducerTemplate();
Map<String, Object> map = new HashMap<>();
map.putIfAbsent("headerA", "A");
map.putIfAbsent("headerB", "B");
CompletableFuture<Object> hello = producer.asyncRequestBodyAndHeaders("direct:input", "hello", map);
hello.thenAccept(x -> {
assertThat(x).isEqualTo("OK !");
responseReceived.set(true);
});
await().atMost(1, TimeUnit.MINUTES).untilAtomic(headersReceived, is(true));
await().atMost(1, TimeUnit.MINUTES).untilAtomic(responseReceived, is(true));
}
/**
* This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
* future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
* return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #thenAccept.
* @param executor the executor to run the thenAccept function if the future is not yet done.
* @param consumer the consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Consumer<? super IN> consumer) {
return completableFuture.isDone() ?
completableFuture.thenAccept(consumer) :
completableFuture.thenAcceptAsync(consumer, executor);
}