下面列出了java.util.concurrent.CompletableFuture#whenCompleteAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
* action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param runnable action which is triggered after the future's completion
* @param executor to run the given action
* @return Future which is completed after the action has completed. This future can contain an exception,
* if an error occurred in the given future or action.
*/
public static CompletableFuture<Void> runAfterwardsAsync(
CompletableFuture<?> future,
RunnableWithException runnable,
Executor executor) {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
future.whenCompleteAsync(
(Object ignored, Throwable throwable) -> {
try {
runnable.run();
} catch (Throwable e) {
throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
}
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
} else {
resultFuture.complete(null);
}
},
executor);
return resultFuture;
}
@Test
public void testBuilder_withFuture() throws Exception {
setupSyncInterface();
when(proxyBuilderSync.build(Mockito.<io.joynr.proxy.ProxyBuilder.ProxyCreatedCallback<MyServiceSync>> any())).thenReturn(myJoynrProxy);
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<MyServiceSync> future = subject.builder(MyServiceSync.class, "local").useFuture().build();
future.whenCompleteAsync((proxy, error) -> {
if (proxy != null && error == null) {
countDownLatch.countDown();
}
});
verify(proxyBuilderSync).build(callbackSyncCaptor.capture());
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> value = callbackSyncCaptor.getValue();
value.onProxyCreationFinished(myJoynrProxy);
countDownLatch.await(100L, TimeUnit.MILLISECONDS);
MyServiceSync myServiceSync = future.get();
assertNotNull(myServiceSync);
}
private RetryingRegistration<F, G, S> createNewRegistration() {
RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
future.whenCompleteAsync(
(Tuple2<G, S> result, Throwable failure) -> {
if (failure != null) {
if (failure instanceof CancellationException) {
// we ignore cancellation exceptions because they originate from cancelling
// the RetryingRegistration
log.debug("Retrying registration towards {} was cancelled.", targetAddress);
} else {
// this future should only ever fail if there is a bug, not if the registration is declined
onRegistrationFailure(failure);
}
} else {
targetGateway = result.f0;
onRegistrationSuccess(result.f1);
}
}, executor);
return newRegistration;
}
/**
* Update the partition infos on the assigned resource.
*
* @param partitionInfos for the remote task
*/
private void sendUpdatePartitionInfoRpcCall(
final Iterable<PartitionInfo> partitionInfos) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
updatePartitionsResultFuture.whenCompleteAsync(
(ack, failure) -> {
// fail if there was a failure
if (failure != null) {
fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
" failed due to:", failure));
}
}, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());
}
}
/**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
* action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param runnable action which is triggered after the future's completion
* @param executor to run the given action
* @return Future which is completed after the action has completed. This future can contain an exception,
* if an error occurred in the given future or action.
*/
public static CompletableFuture<Void> runAfterwardsAsync(
CompletableFuture<?> future,
RunnableWithException runnable,
Executor executor) {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
future.whenCompleteAsync(
(Object ignored, Throwable throwable) -> {
try {
runnable.run();
} catch (Throwable e) {
throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
}
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
} else {
resultFuture.complete(null);
}
},
executor);
return resultFuture;
}
@Override
public void writeRecord(T record) throws IOException {
checkWriteError();
this.pendingWritesCount.incrementAndGet();
final CompletableFuture<Void> future = pravegaWriter.writeEvent(eventRouter.getRoutingKey(record), record);
future.whenCompleteAsync(
(result, e) -> {
if (e != null) {
log.warn("Detected a write failure: {}", e);
// We will record only the first error detected, since this will mostly likely help with
// finding the root cause. Storing all errors will not be feasible.
writeError.compareAndSet(null, e);
}
synchronized (this) {
pendingWritesCount.decrementAndGet();
this.notify();
}
},
executorService
);
}
public CompletableFuture<KeyValue<Errors, byte[]>> handleSyncGroup(
String groupId,
int generation,
String memberId,
Map<String, byte[]> groupAssignment
) {
CompletableFuture<KeyValue<Errors, byte[]>> resultFuture = new CompletableFuture<>();
handleSyncGroup(
groupId,
generation,
memberId,
groupAssignment,
(assignment, errors) -> resultFuture.complete(
new KeyValue<>(errors, assignment))
);
resultFuture.whenCompleteAsync((kv, throwable) -> {
if (throwable == null && kv.getKey() == Errors.NONE) {
offsetAcker.addOffsetsTracker(groupId, kv.getValue());
}
});
return resultFuture;
}
private DevModeHandler(DeploymentConfiguration config, int runningPort,
File npmFolder, CompletableFuture<Void> waitFor) {
port = runningPort;
reuseDevServer = config.reuseDevServer();
devServerStartFuture = waitFor.whenCompleteAsync((value, exception) -> {
// this will throw an exception if an exception has been thrown by
// the waitFor task
waitFor.getNow(null);
runOnFutureComplete(config, npmFolder);
});
}
/**
* Helper method which retries the provided operation in case of a failure.
*
* @param resultFuture to complete
* @param operation to retry
* @param retries until giving up
* @param executor to run the futures
* @param <T> type of the future's result
*/
private static <T> void retryOperation(
final CompletableFuture<T> resultFuture,
final Supplier<CompletableFuture<T>> operation,
final int retries,
final Executor executor) {
if (!resultFuture.isDone()) {
final CompletableFuture<T> operationFuture = operation.get();
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
if (throwable instanceof CancellationException) {
resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
} else {
if (retries > 0) {
retryOperation(
resultFuture,
operation,
retries - 1,
executor);
} else {
resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
"has been exhausted.", throwable));
}
}
} else {
resultFuture.complete(t);
}
},
executor);
resultFuture.whenComplete(
(t, throwable) -> operationFuture.cancel(false));
}
}
protected void handleCreateClient ( final EndpointDescription endpointDescription )
{
logger.trace ( "handleCreateClient () - {}", endpointDescription );
final EndpointDescription usedEndpointDescription = new EndpointDescription ( endpointDescription.getEndpointUrl (), //
endpointDescription.getServer (), //
endpointDescription.getServerCertificate (), //
endpointDescription.getSecurityMode (), //
endpointDescription.getSecurityPolicyUri (), //
endpointDescription.getUserIdentityTokens (), //
endpointDescription.getTransportProfileUri (), //
endpointDescription.getSecurityLevel () );
logger.trace ( "handleCreateClient () - used endpoint = {}", usedEndpointDescription );
final OpcUaClientConfigBuilder clientConfigBuilder = new OpcUaClientConfigBuilder ();
clientConfigBuilder.setEndpoint ( usedEndpointDescription );
// FIXME: check why this was removed, if we need it at all
// clientConfigBuilder.setSecureChannelReauthenticationEnabled ( false );
this.client = new OpcUaClient ( clientConfigBuilder.build () );
this.client.addSessionActivityListener ( this );
this.client.addFaultListener ( this );
final CompletableFuture<UaClient> promise = this.client.connect ();
promise.whenCompleteAsync ( new BiConsumer<UaClient, Throwable> () {
@Override
public void accept ( final UaClient connectedClient, final Throwable t )
{
// FIXME: this is one of the options which will have to come from
if ( t == null )
{
logger.trace ( "handleCreateClient () - connected to '{}'", usedEndpointDescription );
connected ();
}
else
{
logger.info ( "handleCreateClient () - connect to '{}' failed", usedEndpointDescription, t );
OpcUaConnection.this.browserFolder.connectionLost ();
}
}
}, this.executor );
}
@TimeStep(prob = 0)
public void pipelinedGet(final ThreadState state, @StartNanos final long startNanos, final Probe probe) throws Exception {
if (state.pipeline == null) {
state.pipeline = new Pipelining<>(pipelineDepth);
}
CompletableFuture<String> f = map.getAsync(state.randomKey()).toCompletableFuture();
f.whenCompleteAsync((s, throwable) -> probe.done(startNanos), callerRuns);
state.pipeline.add(f);
state.i++;
if (state.i == pipelineIterations) {
state.i = 0;
state.pipeline.results();
state.pipeline = null;
}
}
/**
* Await a Future invoking the callback on completion on the UI thread only if the
* rhis ObjectGroup is still alive when the Future completes.
*/
public <T> void safeWhenComplete(CompletableFuture<T> future, BiConsumer<? super T, ? super Throwable> action) {
if (future == null) {
return;
}
future.whenCompleteAsync(
(T value, Throwable throwable) -> skipIfDisposed(() -> {
ApplicationManager.getApplication().invokeLater(() -> {
action.accept(value, throwable);
});
})
);
}
@Test
public void testBuilder_withFutureAndCallback_fails() throws Exception {
setupSyncInterface();
when(proxyBuilderSync.build(Mockito.<io.joynr.proxy.ProxyBuilder.ProxyCreatedCallback<MyServiceSync>> any())).thenReturn(myJoynrProxy);
CountDownLatch countDownLatch = new CountDownLatch(2);
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> callback = new ProxyBuilder.ProxyCreatedCallback<MyServiceSync>() {
@Override
public void onProxyCreationFinished(MyServiceSync result) {
fail("Should never get called");
}
@Override
public void onProxyCreationError(JoynrRuntimeException error) {
countDownLatch.countDown();
}
};
CompletableFuture<MyServiceSync> future = subject.builder(MyServiceSync.class, "local")
.withCallback(callback)
.useFuture()
.build();
future.whenCompleteAsync((proxy, error) -> {
if (proxy == null && error != null) {
countDownLatch.countDown();
}
});
verify(proxyBuilderSync).build(callbackSyncCaptor.capture());
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> capturedCallback = callbackSyncCaptor.getValue();
capturedCallback.onProxyCreationError(new JoynrRuntimeException());
countDownLatch.await(100L, TimeUnit.MILLISECONDS);
}
private void executeActionAsync(
final CompletableFuture<KvStateResponse> result,
final KvStateRequest request,
final boolean update) {
if (!result.isDone()) {
final CompletableFuture<KvStateResponse> operationFuture = getState(request, update);
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
if (
throwable.getCause() instanceof UnknownKvStateIdException ||
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
throwable.getCause() instanceof ConnectException
) {
// These failures are likely to be caused by out-of-sync
// KvStateLocation. Therefore we retry this query and
// force look up the location.
LOG.debug("Retrying after failing to retrieve state due to: {}.", throwable.getCause().getMessage());
executeActionAsync(result, request, true);
} else {
result.completeExceptionally(throwable);
}
} else {
result.complete(t);
}
}, queryExecutor);
result.whenComplete(
(t, throwable) -> operationFuture.cancel(false));
}
}
@Test
public void testBuilder_withFutureAndCallback() throws Exception {
setupSyncInterface();
when(proxyBuilderSync.build(Mockito.<io.joynr.proxy.ProxyBuilder.ProxyCreatedCallback<MyServiceSync>> any())).thenReturn(myJoynrProxy);
CountDownLatch countDownLatch = new CountDownLatch(2);
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> callback = new ProxyBuilder.ProxyCreatedCallback<MyServiceSync>() {
@Override
public void onProxyCreationFinished(MyServiceSync result) {
countDownLatch.countDown();
}
@Override
public void onProxyCreationError(JoynrRuntimeException error) {
fail("Should never get called");
}
};
CompletableFuture<MyServiceSync> future = subject.builder(MyServiceSync.class, "local")
.withCallback(callback)
.useFuture()
.build();
future.whenCompleteAsync((proxy, error) -> {
if (proxy != null && error == null) {
countDownLatch.countDown();
}
});
verify(proxyBuilderSync).build(callbackSyncCaptor.capture());
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> capturedCallback = callbackSyncCaptor.getValue();
capturedCallback.onProxyCreationFinished(myJoynrProxy);
countDownLatch.await(100L, TimeUnit.MILLISECONDS);
}
@Override
public void write(String key, V value) throws IOException, InterruptedException {
final CompletableFuture<Void> future = writer.writeEvent(key, value);
future.whenCompleteAsync(
(v, e) -> {
if (e != null) {
log.warn("Detected a write failure: {}", e);
}
}
);
}
private void updateTaskExecutionState(
final JobMasterGateway jobMasterGateway,
final TaskExecutionState taskExecutionState) {
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
futureAcknowledge.whenCompleteAsync(
(ack, throwable) -> {
if (throwable != null) {
failTask(executionAttemptID, throwable);
}
},
getMainThreadExecutor());
}
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
acknowledgeFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
}
},
executor);
}
@Test
public void testBuilder_withFuture_fails() throws Exception {
setupSyncInterface();
when(proxyBuilderSync.build(Mockito.<io.joynr.proxy.ProxyBuilder.ProxyCreatedCallback<MyServiceSync>> any())).thenReturn(myJoynrProxy);
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<MyServiceSync> future = subject.builder(MyServiceSync.class, "local").useFuture().build();
future.whenCompleteAsync((proxy, error) -> {
if (error != null && proxy == null) {
countDownLatch.countDown();
}
});
verify(proxyBuilderSync).build(callbackSyncCaptor.capture());
ProxyBuilder.ProxyCreatedCallback<MyServiceSync> value = callbackSyncCaptor.getValue();
value.onProxyCreationError(new JoynrRuntimeException("test"));
countDownLatch.await(100L, TimeUnit.MILLISECONDS);
try {
future.get();
fail("Should never get this far.");
} catch (ExecutionException e) {
if (!(e.getCause() instanceof JoynrRuntimeException)) {
fail("Nested exception not of expected type.");
}
}
}
/**
* This method performs a registration attempt and triggers either a success notification or a retry,
* depending on the result.
*/
@SuppressWarnings("unchecked")
private void register(final G gateway, final int attempt, final long timeoutMillis) {
// eager check for canceling to avoid some unnecessary work
if (canceled) {
return;
}
try {
log.debug("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, fencingToken, timeoutMillis);
// if the registration was successful, let the TaskExecutor know
CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(
(RegistrationResponse result) -> {
if (!isCanceled()) {
if (result instanceof RegistrationResponse.Success) {
// registration successful!
S success = (S) result;
completionFuture.complete(Tuple2.of(gateway, success));
}
else {
// registration refused or unknown
if (result instanceof RegistrationResponse.Decline) {
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
log.info("Registration at {} was declined: {}", targetName, decline.getReason());
} else {
log.error("Received unknown response to registration attempt: {}", result);
}
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getRefusedDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getRefusedDelayMillis());
}
}
},
rpcService.getExecutor());
// upon failure, retry
registrationAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !isCanceled()) {
if (ExceptionUtils.stripCompletionException(failure) instanceof TimeoutException) {
// we simply have not received a response in time. maybe the timeout was
// very low (initial fast registration attempts), maybe the target endpoint is
// currently down.
if (log.isDebugEnabled()) {
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
targetName, targetAddress, attempt, timeoutMillis);
}
long newTimeoutMillis = Math.min(2 * timeoutMillis, retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis());
register(gateway, attempt + 1, newTimeoutMillis);
}
else {
// a serious failure occurred. we still should not give up, but keep trying
log.error("Registration at {} failed due to an error", targetName, failure);
log.info("Pausing and re-attempting registration in {} ms", retryingRegistrationConfiguration.getErrorDelayMillis());
registerLater(gateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), retryingRegistrationConfiguration.getErrorDelayMillis());
}
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}