java.util.concurrent.CompletableFuture#thenAccept ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#thenAccept ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: atomix   文件: OrderedFutureTest.java
/**
 * Tests ordered failure of future callbacks.
 */
public void testOrderedFailure() throws Throwable {
  CompletableFuture<String> future = new OrderedFuture<>();
  AtomicInteger order = new AtomicInteger();
  future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
  future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
  future.handle((r, e) -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.thenRun(() -> fail());
  future.thenAccept(r -> fail());
  future.exceptionally(e -> {
    assertEquals(3, order.incrementAndGet());
    return "bar";
  });
  future.completeExceptionally(new RuntimeException("foo"));
}
 
public static void main(String [] args) throws Exception{
    HttpClient client = HttpClient
        .newBuilder()
        .build();

    HttpRequest request = HttpRequest
        .newBuilder(new URI("http://httpbin.org/get"))
        .GET()
        .version(HttpClient.Version.HTTP_1_1)
        .build();

    CompletableFuture<HttpResponse<String>> responseFuture = 
        client.sendAsync(request, HttpResponse.BodyHandlers.ofString());
    
    CompletableFuture<Void> processedFuture = 
        responseFuture.thenAccept(response -> {
            System.out.println("Status code: " + response.statusCode());
            System.out.println("Response Body: " + response.body());
        });

    //wait for the CompleteableFuture to complete
    CompletableFuture.allOf(processedFuture).join();
}
 
源代码3 项目: openjdk-jdk9   文件: MultiExchange.java
CompletableFuture<U> multiResponseAsync() {
    CompletableFuture<Void> start = new MinimalFuture<>();
    CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
    CompletableFuture<HttpResponse<T>> mainResponse =
            cf.thenApply((HttpResponseImpl<T> b) -> {
                  multiResponseHandler.onResponse(b);
                  return (HttpResponse<T>)b;
               });

    pushGroup.setMainResponse(mainResponse);
    // set up house-keeping related to multi-response
    mainResponse.thenAccept((r) -> {
        // All push promises received by now.
        pushGroup.noMorePushes(true);
    });
    CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
    start.completeAsync( () -> null, executor); // trigger execution
    return res;
}
 
@Test
public void givenProcess_whenAddExitCallback_thenSuccess() throws Exception {
    String javaCmd = ProcessUtils.getJavaCmd()
      .getAbsolutePath();
    ProcessBuilder processBuilder
      = new ProcessBuilder(javaCmd, "-version");
    Process process = processBuilder.inheritIO()
      .start();
    ProcessHandle processHandle = process.toHandle();

    log.info("PID: {} has started", processHandle.pid());
    CompletableFuture<ProcessHandle> onProcessExit = processHandle.onExit();
    onProcessExit.get();
    assertEquals(false, processHandle.isAlive());
    onProcessExit.thenAccept(ph -> {
        log.info("PID: {} has stopped", ph.pid());
    });
}
 
源代码5 项目: Prism   文件: DataUtil.java
/**
 * Helper method to translate Player UUIDs to names.
 *
 * @param results List of results
 * @param uuidsPendingLookup Lists of UUIDs pending lookup
 * @return CompletableFuture
 */
public static CompletableFuture<List<Result>> translateUuidsToNames(List<Result> results, List<UUID> uuidsPendingLookup) {
    CompletableFuture<List<Result>> future = new CompletableFuture<>();

    CompletableFuture<Collection<GameProfile>> futures = Sponge.getServer().getGameProfileManager().getAllById(uuidsPendingLookup, true);
    futures.thenAccept((profiles) -> {
        for (GameProfile profile : profiles) {
            for (Result r : results) {
                Optional<Object> cause = r.data.get(DataQueries.Cause);
                if (cause.isPresent() && cause.get().equals(profile.getUniqueId().toString())) {
                    r.data.set(DataQueries.Cause, profile.getName().orElse("unknown"));
                }
            }
        }

        future.complete(results);
    });

    return future;
}
 
源代码6 项目: ua-server-sdk   文件: Pending.java
/**
 * Builds a {@link CompletableFuture} suitable for use as a completion callback. When completed, each of the
 * provided {@link Pending}'s {@link CompletableFuture} will be completed.
 * <p>
 * It is assumed that the size of the list this future is completed with matches the number of provided {@link
 * Pending}s. A {@link RuntimeException} will be thrown otherwise.
 *
 * @param pending A list of {@link Pending} operations.
 * @param <I>     Input parameter of {@link Pending} operations.
 * @param <O>     Output parameter of {@link Pending} operations.
 * @return A {@link CompletableFuture} that, when completed, then completes each of the given {@link Pending}'s
 * {@link CompletableFuture}.
 */
public static <I, O> CompletableFuture<List<O>> callback(List<? extends Pending<I, O>> pending) {
    CompletableFuture<List<O>> future = new CompletableFuture<>();

    future.thenAccept(results -> {
        if (results.size() != pending.size()) {
            String message = String.format("result size (%s) does not match pending size (%s)", results.size(), pending.size());
            throw new RuntimeException(message);
        }

        Iterator<? extends Pending<I, O>> pi = pending.iterator();
        Iterator<O> ri = results.iterator();

        while (pi.hasNext() && ri.hasNext()) {
            pi.next().getFuture().complete(ri.next());
        }
    });

    return future;
}
 
源代码7 项目: samza   文件: EventHubSystemAdmin.java
private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) {
  EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName);
  Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>();
  List<CompletableFuture<PartitionRuntimeInformation>> futureList = new ArrayList<>();

  for (String partition : partitionIds) {
    CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager
            .getEventHubClient()
            .getPartitionRuntimeInformation(partition);
    futureList.add(partitionRuntimeInfo);
    partitionRuntimeInfo.thenAccept(ehPartitionInfo -> {
      LOG.info(printPartitionRuntimeInfo(ehPartitionInfo));
      // Set offsets
      String startingOffset = EventHubSystemConsumer.START_OF_STREAM;
      String newestOffset = ehPartitionInfo.getLastEnqueuedOffset();
      String upcomingOffset = EventHubSystemConsumer.END_OF_STREAM;
      SystemStreamPartitionMetadata sspMetadata = new SystemStreamPartitionMetadata(startingOffset, newestOffset,
        upcomingOffset);
      sspMetadataMap.put(new Partition(Integer.parseInt(partition)), sspMetadata);
    });
  }

  CompletableFuture<Void> futureGroup =
      CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
  long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
  try {
    futureGroup.get(timeoutMs, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
    String msg = String.format(
        "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s",
        systemName, streamName);
    LOG.error(msg, e);
    throw new SamzaException(msg, e);
  }
  return sspMetadataMap;
}
 
@Override
void processClientRequest(RaftClientRequest request) {
  final CompletableFuture<Void> f = processClientRequest(request, reply -> {
    if (!reply.isSuccess()) {
      LOG.info("Failed " + request + ", reply=" + reply);
    }
    final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
    responseNext(proto);
  });
  final long callId = request.getCallId();
  put(callId, f);
  f.thenAccept(dummy -> remove(callId));
}
 
源代码9 项目: pravega   文件: AsyncIterator.java
/**
 * Returns a new {@link AsyncIterator} that wraps this instance which serializes the execution of all calls to
 * {@link #getNext()} (no two executions of {@link #getNext()} will ever execute at the same time; they will be
 * run in the order of invocation, but only after the previous one completes).
 *
 * @param executor An Executor to run async tasks on.
 * @return A new {@link AsyncIterator}.
 */
default AsyncIterator<T> asSequential(Executor executor) {
    SequentialProcessor processor = new SequentialProcessor(executor);
    return () -> {
        CompletableFuture<T> result = processor.add(AsyncIterator.this::getNext);
        result.thenAccept(r -> {
            if (r == null) {
                processor.close();
            }
        });
        return result;
    };
}
 
源代码10 项目: flink   文件: MiniCluster.java
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
	return blobServerAddressFuture.thenAccept(blobServerAddress -> {
		try {
			ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
		} catch (FlinkException e) {
			throw new CompletionException(e);
		}
	});
}
 
源代码11 项目: flink   文件: AbstractRestHandler.java
@Override
protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
	CompletableFuture<P> response;

	try {
		response = handleRequest(handlerRequest, gateway);
	} catch (RestHandlerException e) {
		response = FutureUtils.completedExceptionally(e);
	}

	return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders));
}
 
源代码12 项目: flink   文件: AbstractTaskManagerFileHandler.java
private void removeBlob(RemovalNotification<ResourceID, CompletableFuture<TransientBlobKey>> removalNotification) {
	log.debug("Remove cached file for TaskExecutor {}.", removalNotification.getKey());

	final CompletableFuture<TransientBlobKey> value = removalNotification.getValue();

	if (value != null) {
		value.thenAccept(transientBlobService::deleteFromCache);
	}
}
 
@Test
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");

    CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));

    future.get();
}
 
源代码14 项目: monsoon   文件: MetricListener.java
@Override
public Collection<CompletableFuture<? extends Collection<? extends MetricGroup>>> getGroups(Executor threadpool, CompletableFuture<TimeoutObject> timeout) {
    /*
     * Force the connection to open, even if there are no groups to scan.
     *
     * If there are no groups registered, the connection opening won't be triggered otherwise.
     * And if that isn't triggered, registering groups won't happen either.
     */
    CompletableFuture<List<MetricGroup>> future = connection.getConnection(threadpool)
            .applyToEitherAsync(
                    timeout.thenApply(timeoutObject -> new GCCloseable<JMXConnector>()),
                    conn -> {
                        if (conn.get() == null)
                            throw new RuntimeException("connection unavailable");

                        synchronized (this) {
                            try {
                                List<MetricGroup> result = new ArrayList<>();
                                for (MBeanGroup group
                                             : detected_groups_.values()) {
                                    if (timeout.isDone())
                                        throw new RuntimeException("timed out");
                                    group.getMetrics(conn.get().getMBeanServerConnection()).ifPresent(result::add);
                                }
                                return result;
                            } catch (IOException ex) {
                                throw new RuntimeException("unable to get MBeanServerConnection", ex);
                            }
                        }
                    },
                    threadpool);
    timeout.thenAccept(timeoutObject -> future.cancel(true));
    return singleton(future);
}
 
源代码15 项目: flink   文件: MiniCluster.java
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
	return blobServerAddressFuture.thenAccept(blobServerAddress -> {
		try {
			ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
		} catch (FlinkException e) {
			throw new CompletionException(e);
		}
	});
}
 
源代码16 项目: problematic-microservices   文件: LoadWorker.java
private void doFullRegisterOrderAndRemove() throws InterruptedException, ExecutionException {
	SpanBuilder spanBuilder = getTracer().buildSpan("fullSystemTest");
	final Span span = spanBuilder.start();
	try {
		SpanContext parentContext = span.context();

		// Register 
		CompletableFuture<Customer> newCustomer = CompletableFuture
				.supplyAsync(() -> registerRandomCustomer(parentContext));
		// Maybe not get the types and colors over and over. Looks pretty in the traces though...
		CompletableFuture<RobotType[]> availableTypes = CompletableFuture
				.supplyAsync(() -> getAllTypes(parentContext));
		CompletableFuture<Color[]> availableColors = CompletableFuture
				.supplyAsync(() -> getAllColors(parentContext));
		CompletableFuture.allOf(newCustomer, availableTypes, availableColors);

		Customer customer = newCustomer.get();

		// First completion stage done. Now we can create the order
		List<RobotOrderLineItem> lineItems = createRandomOrder(availableTypes.get(), availableColors.get());
		CompletableFuture<RobotOrder> robotOrderCompletable = CompletableFuture
				.supplyAsync(() -> postOrder(customer, lineItems, parentContext));

		// Rest will happen asynchrously when data is available...
		CompletableFuture<RealizedOrder> realizedOrderFuture = new CompletableFuture<RealizedOrder>();
		// When we have the order, we schedule the polling for an available order...
		robotOrderCompletable
				.thenAccept((order) -> awaitOrderCompletion(order, realizedOrderFuture, parentContext));
		// Once the order is realized, we will remove the customer.
		realizedOrderFuture.thenApply((realizedOrder) -> removeOwner(realizedOrder, parentContext))
				.thenAccept((customerId) -> span.finish());
	} catch (Throwable t) {
		span.log(OpenTracingUtil.getSpanLogMap(t));
		throw t;
	}
}
 
/**
   * Executes a non blocking call to the GraphQL query processor and executes the query.
   *
   * @param graphqlQuery the graphql query
   * @param variables    the variables to pass to the query
   * @param resultHandler vertx result handler 
   */
  public void queryNonBlocking(String graphqlQuery, JsonObject variables, Handler<AsyncResult<QueryResult>> resultHandler) {
      Objects.requireNonNull(graphqlQuery, "GraphQL query cannot be null");
      
      GraphQL graphQL = new GraphQL.Builder(schema()).build();
      
      ExecutionInput.Builder asyncExecBuilder = ExecutionInput.newExecutionInput().query(graphqlQuery);
      if (variables != null) {
      		asyncExecBuilder.variables(variables.getMap());
      }
      
      CompletableFuture<ExecutionResult> promise = graphQL.executeAsync(asyncExecBuilder.build());
      
      promise.thenAccept(new Consumer<ExecutionResult>() {

	@Override
	public void accept(ExecutionResult result) {
		try {
			QueryResult queryResult = convertToQueryResult(result);
			 
			resultHandler.handle(Future.succeededFuture(queryResult));
		} catch (Exception e) {
			resultHandler.handle(Future.failedFuture(e));
		}
		
	}
});        
  }
 
源代码18 项目: aws-sdk-java-v2   文件: Http2PingHandler.java
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
    CompletableFuture<Protocol> protocolFuture = ctx.channel().attr(ChannelAttributeKey.PROTOCOL_FUTURE).get();
    Validate.validState(protocolFuture != null, "Protocol future must be initialized before handler is added.");
    protocolFuture.thenAccept(p -> start(p, ctx));
}
 
源代码19 项目: vertx-camel-bridge   文件: InOutTest.java
@Test
public void testInOut() throws Exception {
  AtomicBoolean headersReceived = new AtomicBoolean();
  AtomicBoolean responseReceived = new AtomicBoolean();

  camel.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
      from("direct:input")
        .inOut("direct:code-generator")
        .process()
        .body((o, h) -> {
          assertThat(h).contains(entry("foo", "bar"));
          headersReceived.set(true);
        });
    }
  });

  bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addInboundMapping(InboundMapping.fromCamel("direct:code-generator").toVertx("code-generator")));

  vertx.eventBus().consumer("code-generator", msg -> {
    assertThat(msg.headers().names()).hasSize(2)
      .contains("headerA", "headerB");

    msg.reply("OK !", new DeliveryOptions().addHeader("foo", "bar"));

  });

  camel.start();
  BridgeHelper.startBlocking(bridge);

  ProducerTemplate producer = camel.createProducerTemplate();
  Map<String, Object> map = new HashMap<>();
  map.putIfAbsent("headerA", "A");
  map.putIfAbsent("headerB", "B");
  CompletableFuture<Object> hello = producer.asyncRequestBodyAndHeaders("direct:input", "hello", map);
  hello.thenAccept(x -> {
    assertThat(x).isEqualTo("OK !");
    responseReceived.set(true);
  });

  await().atMost(1, TimeUnit.MINUTES).untilAtomic(headersReceived, is(true));
  await().atMost(1, TimeUnit.MINUTES).untilAtomic(responseReceived, is(true));
}
 
源代码20 项目: flink   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
 * future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
 * return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to call #thenAccept.
 * @param executor the executor to run the thenAccept function if the future is not yet done.
 * @param consumer the consumer function to call when the future is completed.
 * @param <IN> type of the input future.
 * @return the new completion stage.
 */
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	Consumer<? super IN> consumer) {
	return completableFuture.isDone() ?
		completableFuture.thenAccept(consumer) :
		completableFuture.thenAcceptAsync(consumer, executor);
}