下面列出了java.util.concurrent.CompletableFuture#thenAcceptAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(
final Collection<OperatorCoordinatorCheckpointContext> coordinators,
final PendingCheckpoint checkpoint,
final Executor acknowledgeExecutor) throws Exception {
final CompletableFuture<AllCoordinatorSnapshots> snapshots =
triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());
return snapshots
.thenAcceptAsync(
(allSnapshots) -> {
try {
acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
}
catch (Exception e) {
throw new CompletionException(e);
}
},
acknowledgeExecutor);
}
private CompletableFuture<TransformationResult> transform(TransformationRequest transformationRequest) {
Configuration configuration = transformationRequest.getConfiguration();
if (logger.isDebugEnabled()) {
logger.debug("Transformation request configuration: {}", configuration);
}
CompletableFuture<TransformationResult> transformationResult = transformationEngine.perform(transformationRequest);
if(!configuration.isModifyOriginalFolder() && configuration.isZipOutput()){
transformationResult.thenAcceptAsync(compressionHandler::compress);
}
return transformationResult;
}
@Test
public void shouldProcessSessionRequestsInOrder() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
final ResultSet rsZero = client.submit("'zero'");
final CompletableFuture<List<Result>> futureFive = rsFive.all();
final CompletableFuture<List<Result>> futureZero = rsZero.all();
final CountDownLatch latch = new CountDownLatch(2);
final List<String> order = new ArrayList<>();
final ExecutorService executor = Executors.newSingleThreadExecutor();
futureFive.thenAcceptAsync(r -> {
order.add(r.get(0).getString());
latch.countDown();
}, executor);
futureZero.thenAcceptAsync(r -> {
order.add(r.get(0).getString());
latch.countDown();
}, executor);
// wait for both results
latch.await(30000, TimeUnit.MILLISECONDS);
// should be two results
assertEquals(2, order.size());
// ensure that "five" is first then "zero"
assertThat(order, contains("five", "zero"));
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onRemoval(@Nullable K key,
@Nullable CompletableFuture<V> future, RemovalCause cause) {
if (future != null) {
future.thenAcceptAsync(value -> delegate.onRemoval(key, value, cause), executor);
}
}
/***************************************
* {@inheritDoc}
*/
@Override
public void runAsync(CompletableFuture<T> fPreviousExecution,
CoroutineStep<T, ?> rNextStep,
Continuation<?> rContinuation)
{
fPreviousExecution.thenAcceptAsync(
i -> loopAsync(i, rNextStep, rContinuation));
}
/***************************************
* {@inheritDoc}
*/
@Override
public void runAsync(CompletableFuture<I> fPreviousExecution,
CoroutineStep<Collection<O>, ?> rNextStep,
Continuation<?> rContinuation)
{
fPreviousExecution.thenAcceptAsync(
rInput -> collectAsync(rInput, rNextStep, rContinuation));
}
/***************************************
* {@inheritDoc}
*/
@Override
public void runAsync(CompletableFuture<I> fPreviousExecution,
CoroutineStep<C, ?> rNextStep,
Continuation<?> rContinuation)
{
C aResults =
fCollectionFactory != null ? fCollectionFactory.get() : null;
fPreviousExecution.thenAcceptAsync(
i -> iterateAsync(i.iterator(), aResults, rNextStep, rContinuation));
}
/***************************************
* {@inheritDoc}
*/
@Override
public void runAsync(CompletableFuture<ByteBuffer> fPreviousExecution,
CoroutineStep<ByteBuffer, ?> rNextStep,
Continuation<?> rContinuation)
{
fPreviousExecution.thenAcceptAsync(
b -> connectAsync(b, rContinuation.suspend(this, rNextStep)),
rContinuation);
}
@Override
protected Subscription observeInputs() {
CompletableFuture<Subscription> subscription = new CompletableFuture<>();
sourceThreadExecutor.execute(() -> {
subscription.complete(
input.subscribe(e -> {
targetThreadExecutor.execute(() -> emit(e));
}));
});
return () -> {
subscription.thenAcceptAsync(
Subscription::unsubscribe,
sourceThreadExecutor);
};
}
@Override
public Publisher<Void> close() {
return s -> {
CompletableFuture<R2DBCDatabaseContainer> futureRef;
synchronized (this) {
futureRef = this.future;
this.future = null;
}
CancellableSubscription subscription = new CancellableSubscription();
s.onSubscribe(subscription);
if (futureRef == null) {
if (!subscription.isCancelled()) {
s.onComplete();
}
} else {
futureRef.thenAcceptAsync(Startable::stop, EXECUTOR);
EXECUTOR.execute(() -> {
futureRef.cancel(true);
if (!subscription.isCancelled()) {
s.onComplete();
}
});
}
};
}
private void reloadPluginConfig(final BesuPlugin plugin) {
final String name = plugin.getName().orElseThrow();
LOG.info("Reloading plugin configuration: {}.", name);
final CompletableFuture<Void> future = plugin.reloadConfiguration();
future.thenAcceptAsync(aVoid -> LOG.info("Plugin {} has been reloaded.", name));
}
/**
* This method performs a registration attempt and triggers either a success notification or a retry,
* depending on the result.
*/
@SuppressWarnings("unchecked")
private void register(final G gateway, final int attempt, final long timeoutMillis) {
// eager check for canceling to avoid some unnecessary work
if (canceled) {
return;
}
try {
log.debug("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, fencingToken, timeoutMillis);
// if the registration was successful, let the TaskExecutor know
CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(
(RegistrationResponse result) -> {
if (!isCanceled()) {
if (result instanceof RegistrationResponse.Success) {
// registration successful!
S success = (S) result;
completionFuture.complete(Tuple2.of(gateway, success));
}
else {
// registration refused or unknown
if (result instanceof RegistrationResponse.Decline) {
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
log.info("Registration at {} was declined: {}", targetName, decline.getReason());
} else {
log.error("Received unknown response to registration attempt: {}", result);
}
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getRefusedDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getRefusedDelayMillis());
}
}
},
rpcService.getExecutor());
// upon failure, retry
registrationAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !isCanceled()) {
if (ExceptionUtils.stripCompletionException(failure) instanceof TimeoutException) {
// we simply have not received a response in time. maybe the timeout was
// very low (initial fast registration attempts), maybe the target endpoint is
// currently down.
if (log.isDebugEnabled()) {
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
targetName, targetAddress, attempt, timeoutMillis);
}
long newTimeoutMillis = Math.min(2 * timeoutMillis, retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis());
register(gateway, attempt + 1, newTimeoutMillis);
}
else {
// a serious failure occurred. we still should not give up, but keep trying
log.error("Registration at {} failed due to an error", targetName, failure);
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getErrorDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getErrorDelayMillis());
}
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
/**
* This method resolves the target address to a callable gateway and starts the
* registration after that.
*/
@SuppressWarnings("unchecked")
public void startRegistration() {
if (canceled) {
// we already got canceled
return;
}
try {
// trigger resolution of the target address to a callable gateway
final CompletableFuture<G> rpcGatewayFuture;
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}
// upon success, start the registration attempts
CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration", targetName);
register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());
// upon failure, retry, unless this is cancelled
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);
if (log.isDebugEnabled()) {
log.debug(
"Could not resolve {} address {}, retrying in {} ms.",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure);
} else {
log.info(
"Could not resolve {} address {}, retrying in {} ms: {}.",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure.getMessage());
}
startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
/**
* This method performs a registration attempt and triggers either a success notification or a retry,
* depending on the result.
*/
@SuppressWarnings("unchecked")
private void register(final G gateway, final int attempt, final long timeoutMillis) {
// eager check for canceling to avoid some unnecessary work
if (canceled) {
return;
}
try {
log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, fencingToken, timeoutMillis);
// if the registration was successful, let the TaskExecutor know
CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(
(RegistrationResponse result) -> {
if (!isCanceled()) {
if (result instanceof RegistrationResponse.Success) {
// registration successful!
S success = (S) result;
completionFuture.complete(Tuple2.of(gateway, success));
}
else {
// registration refused or unknown
if (result instanceof RegistrationResponse.Decline) {
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
log.info("Registration at {} was declined: {}", targetName, decline.getReason());
} else {
log.error("Received unknown response to registration attempt: {}", result);
}
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getRefusedDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getRefusedDelayMillis());
}
}
},
rpcService.getExecutor());
// upon failure, retry
registrationAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !isCanceled()) {
if (ExceptionUtils.stripCompletionException(failure) instanceof TimeoutException) {
// we simply have not received a response in time. maybe the timeout was
// very low (initial fast registration attempts), maybe the target endpoint is
// currently down.
if (log.isDebugEnabled()) {
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
targetName, targetAddress, attempt, timeoutMillis);
}
long newTimeoutMillis = Math.min(2 * timeoutMillis, retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis());
register(gateway, attempt + 1, newTimeoutMillis);
}
else {
// a serious failure occurred. we still should not give up, but keep trying
log.error("Registration at {} failed due to an error", targetName, failure);
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getErrorDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getErrorDelayMillis());
}
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
/**
* This method resolves the target address to a callable gateway and starts the
* registration after that.
*/
@SuppressWarnings("unchecked")
public void startRegistration() {
if (canceled) {
// we already got canceled
return;
}
try {
// trigger resolution of the target address to a callable gateway
final CompletableFuture<G> rpcGatewayFuture;
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}
// upon success, start the registration attempts
CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration", targetName);
register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());
// upon failure, retry, unless this is cancelled
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);
if (log.isDebugEnabled()) {
log.debug(
"Could not resolve {} address {}, retrying in {} ms.",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure);
} else {
log.info(
"Could not resolve {} address {}, retrying in {} ms: {}",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure.getMessage());
}
startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
public <T> CompletableFuture<Void> thenAccept
(CompletableFuture<T> f, Consumer<? super T> a) {
return f.thenAcceptAsync(a, new ThreadExecutor());
}
public <T> CompletableFuture<Void> thenAccept
(CompletableFuture<T> f, Consumer<? super T> a) {
return f.thenAcceptAsync(a, new ThreadExecutor());
}
public <T> CompletableFuture<Void> thenAccept
(CompletableFuture<T> f, Consumer<? super T> a) {
return f.thenAcceptAsync(a);
}
/**
* This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
* future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
* return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #thenAccept.
* @param executor the executor to run the thenAccept function if the future is not yet done.
* @param consumer the consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Consumer<? super IN> consumer) {
return completableFuture.isDone() ?
completableFuture.thenAccept(consumer) :
completableFuture.thenAcceptAsync(consumer, executor);
}
/**
* This function takes a {@link CompletableFuture} and a consumer to accept the result of this future. If the input
* future is already done, this function returns {@link CompletableFuture#thenAccept(Consumer)}. Otherwise, the
* return value is {@link CompletableFuture#thenAcceptAsync(Consumer, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #thenAccept.
* @param executor the executor to run the thenAccept function if the future is not yet done.
* @param consumer the consumer function to call when the future is completed.
* @param <IN> type of the input future.
* @return the new completion stage.
*/
public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Consumer<? super IN> consumer) {
return completableFuture.isDone() ?
completableFuture.thenAccept(consumer) :
completableFuture.thenAcceptAsync(consumer, executor);
}