下面列出了java.util.concurrent.CompletableFuture#whenComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Returns a CompletableFuture that will complete with the same outcome or result as the given source, but when
* cancelled, will apply a consumer to the eventual result of the original future.
* <p>
* If the returned CompletableFuture is NOT cancelled ({@link CompletableFuture#cancel}):
* - If source completes normally, the result CompletableFuture will complete with the same result.
* - If source completes exceptionally, the result CompletableFuture will complete with the same result.
* <p>
* If the returned CompletableFuture is cancelled ({@link CompletableFuture#cancel}):
* - If the source has already completed, the result CompletableFuture will also be completed with the same outcome.
* - If the source has not already been completed, if it completes normally, then `onCancel` will be applied to
* the result when it eventually completes. The source completes exceptionally, nothing will happen.
*
* @param source The CompletableFuture to wrap.
* @param onCancel A Consumer to invoke on source's eventual completion result if the result of this method is cancelled.
* @param <T> Result type.
* @return A CompletableFuture that will complete with the same outcome or result as the given source.
*/
public static <T> CompletableFuture<T> cancellableFuture(CompletableFuture<T> source, Consumer<T> onCancel) {
if (source == null) {
return null;
}
val result = new CompletableFuture<T>();
source.whenComplete((r, ex) -> {
if (ex == null) {
result.complete(r);
} else {
result.completeExceptionally(ex);
}
});
Futures.exceptionListener(result, ex -> {
if (ex instanceof CancellationException && !source.isCancelled()) {
source.thenAccept(onCancel);
}
});
return result;
}
@Override
public CompletableFuture<Result> invoke(final RequestMessage<Invocation> request) {
// 回调的场景少,直接采用反射
CompletableFuture<Result> result = new CompletableFuture<>();
try {
Invocation payLoad = request.getPayLoad();
Method method = getPublicMethod(callbackClass, payLoad.getMethodName());
Object value = method.invoke(callback, payLoad.getArgs());
//异步回调
CompletableFuture<?> future = isReturnFuture(callbackClass, method) ?
(CompletableFuture<?>) value : CompletableFuture.completedFuture(value);
future.whenComplete((v, t) -> {
if (t != null) {
result.complete(new Result(request.getContext(), t));
} else {
result.complete(new Result(request.getContext(), v));
}
});
} catch (Throwable e) {
result.completedFuture(new Result(request.getContext(), e));
}
return result;
}
@Override
public CompletableFuture<Result> invoke(final Invoker invoker, final RequestMessage<Invocation> request) {
if (factory == null) {
return invoker.invoke(request);
}
Invocation invocation = request.getPayLoad();
InterfaceOption.MethodOption option = request.getOption();
if (!option.isTrace()) {
return invoker.invoke(request);
}
//构造跟踪标签
Map<String, String> tags = new HashMap<>();
createTags(request, tags);
//创建跟踪
Tracer trace = factory.create(request);
//启动跟踪
trace.begin(option.getTraceSpanId(invocation), component, tags);
//快照
trace.snapshot();
//远程调用
CompletableFuture<Result> future = invoker.invoke(request);
//主线程调用结束
trace.prepare();
future.whenComplete((result, throwable) -> {
//异步线程恢复
trace.restore();
//异步线程结束
trace.end(throwable == null ? result.getException() : throwable);
});
return future;
}
private void internalAllocateSlot(
CompletableFuture<LogicalSlot> allocationResultFuture,
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
Time allocationTimeout) {
CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
Optional<SharedSlotOversubscribedException> sharedSlotOverAllocatedException =
ExceptionUtils.findThrowable(failure, SharedSlotOversubscribedException.class);
if (sharedSlotOverAllocatedException.isPresent() &&
sharedSlotOverAllocatedException.get().canRetry()) {
// Retry the allocation
internalAllocateSlot(
allocationResultFuture,
slotRequestId,
scheduledUnit,
slotProfile,
allowQueuedScheduling,
allocationTimeout);
} else {
cancelSlotRequest(
slotRequestId,
scheduledUnit.getSlotSharingGroupId(),
failure);
allocationResultFuture.completeExceptionally(failure);
}
} else {
allocationResultFuture.complete(slot);
}
});
}
public void unicastClusterEvent(String host, int port, String topic, String msg) {
LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}",
host, port, topic, msg);
Address address = Address.from(host, port);
CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
topic, msg.getBytes(), Duration.ofSeconds(2));
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
}
});
}
@Override
protected CompletableFuture<Void> doOpen() {
CompletableFuture<Void> future = new CompletableFuture<>();
//创建注册中心
registryRef = REGISTRY.get(registryUrl.getProtocol()).getRegistry(registryUrl);
//构建代理
config.proxy();
//订阅,等到初始化配置
chain(subscribe(), future, (v) -> chain(waitingConfig, future, (url) -> {
//检查动态配置是否修改了别名,需要重新订阅
serviceUrl = url;
registerUrl = config.register ? buildRegisteredUrl(registryRef, url) : null;
resubscribe(buildSubscribedUrl(configureRef, url), false);
try {
refer = ServiceManager.refer(url, config, registryRef, registerUrl, configureRef, subscribeUrl, configHandler);
//打开
chain(refer.open(), future, s -> {
//构建调用器
invokeHandler = new ConsumerInvokeHandler(refer, proxyClass, refer.getUrl());
future.complete(null);
});
} catch (Throwable ex) {
future.completeExceptionally(ex);
}
}));
future.whenComplete((v, err) -> latch.countDown());
return future;
}
@Override
public CompletableFuture<Void> stopService() {
final CompletableFuture<Void> akkaRpcActorsTerminationFuture;
synchronized (lock) {
if (stopped) {
return terminationFuture;
}
LOG.info("Stopping Akka RPC service.");
stopped = true;
akkaRpcActorsTerminationFuture = terminateAkkaRpcActors();
}
final CompletableFuture<Void> supervisorTerminationFuture = FutureUtils.composeAfterwards(
akkaRpcActorsTerminationFuture,
supervisor::closeAsync);
final CompletableFuture<Void> actorSystemTerminationFuture = FutureUtils.composeAfterwards(
supervisorTerminationFuture,
() -> FutureUtils.toJava(actorSystem.terminate()));
actorSystemTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
LOG.info("Stopped Akka RPC service.");
});
return terminationFuture;
}
@PUT
@Path("/{name}/{path: .*}")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
public void set(
@PathParam("name") String name,
@PathParam("path") List<PathSegment> path,
String value,
@QueryParam("version") Long version,
@Suspended AsyncResponse response) {
CompletableFuture<Boolean> future;
if (version != null) {
future = getPrimitive(name).thenCompose(tree -> tree.replace(getDocumentPath(path), value, version));
} else {
future = getPrimitive(name).thenCompose(tree -> tree.set(getDocumentPath(path), value).thenApply(v -> Boolean.TRUE));
}
future.whenComplete((result, error) -> {
if (error == null) {
response.resume(Response.ok(result).build());
} else {
if (error.getCause() != null) {
error = error.getCause();
}
if (error instanceof IllegalDocumentModificationException || error instanceof NoSuchDocumentPathException) {
response.resume(Response.ok(false).build());
} else {
LOGGER.warn("{}", error);
response.resume(Response.serverError().build());
}
}
});
}
/**
* Registers an ongoing operation with the cache.
*
* @param operationResultFuture A future containing the operation result.
*/
public void registerOngoingOperation(
final K operationKey,
final CompletableFuture<R> operationResultFuture) {
final ResultAccessTracker<R> inProgress = ResultAccessTracker.inProgress();
registeredOperationTriggers.put(operationKey, inProgress);
operationResultFuture.whenComplete((result, error) -> {
if (error == null) {
completedOperations.put(operationKey, inProgress.finishOperation(Either.Right(result)));
} else {
completedOperations.put(operationKey, inProgress.finishOperation(Either.Left(error)));
}
registeredOperationTriggers.remove(operationKey);
});
}
/**
* Schedule the operation with the given delay.
*
* @param operation to schedule
* @param delay delay to schedule
* @param scheduledExecutor executor to be used for the operation
* @param <T> type of the result
* @return Future which schedules the given operation with given delay.
*/
public static <T> CompletableFuture<T> scheduleWithDelay(
final Supplier<T> operation,
final Time delay,
final ScheduledExecutor scheduledExecutor) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
() -> {
try {
resultFuture.complete(operation.get());
} catch (Throwable t) {
resultFuture.completeExceptionally(t);
}
},
delay.getSize(),
delay.getUnit()
);
resultFuture.whenComplete(
(t, throwable) -> {
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
});
return resultFuture;
}
public static <T> void proxyTo(CompletableFuture<T> src,
CompletableFuture<T> target) {
src.whenComplete((value, cause) -> {
if (null == cause) {
target.complete(value);
} else {
target.completeExceptionally(cause);
}
});
}
public void scheduleForExecution() throws JobException {
assertRunningInJobMasterMainThread();
if (isLegacyScheduling()) {
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
}
final long currentGlobalModVersion = globalModVersion;
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
scheduleMode,
getAllExecutionVertices(),
this);
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (!(strippedThrowable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(strippedThrowable);
}
}
});
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}
public static <T> CompletableFuture<T> rescue(CompletableFuture<T> future,
Function<Throwable, CompletableFuture<T>> rescueFuc) {
CompletableFuture<T> result = FutureUtils.createFuture();
future.whenComplete((value, cause) -> {
if (null == cause) {
result.complete(value);
return;
}
proxyTo(rescueFuc.apply(cause), result);
});
return result;
}
/**
* Test Case: starting reading when the streams don't exist.
* {@link https://issues.apache.org/jira/browse/DL-42}
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 120000)
public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception {
// int count = 50;
int count = 1;
String name = runtime.getMethodName();
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(testConf);
confLocal.setReadAheadWaitTime(10);
confLocal.setReadAheadBatchSize(10);
confLocal.setOutputBufferSize(1024);
int numLogSegments = 3;
int numRecordsPerLogSegment = 1;
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal).uri(uri).build();
final DistributedLogManager[] dlms = new DistributedLogManager[count];
final TestReader[] readers = new TestReader[count];
final CountDownLatch readyLatch = new CountDownLatch(count);
final CountDownLatch[] syncLatches = new CountDownLatch[count];
final CountDownLatch[] readerDoneLatches = new CountDownLatch[count];
for (int s = 0; s < count; s++) {
dlms[s] = namespace.openLog(name + String.format("%d", s));
readerDoneLatches[s] = new CountDownLatch(1);
syncLatches[s] = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
readers[s] = new TestReader("reader-" + s,
dlms[s], DLSN.InitialDLSN, false, 0, readyLatch, syncLatches[s], readerDoneLatches[s]);
readers[s].start();
}
// wait all readers were positioned at least once
readyLatch.await();
final CountDownLatch writeLatch = new CountDownLatch(3 * count);
final AtomicBoolean writeErrors = new AtomicBoolean(false);
int txid = 1;
for (long i = 0; i < 3; i++) {
final long currentLogSegmentSeqNo = i + 1;
BKAsyncLogWriter[] writers = new BKAsyncLogWriter[count];
for (int s = 0; s < count; s++) {
writers[s] = (BKAsyncLogWriter) (dlms[s].startAsyncLogSegmentNonPartitioned());
}
for (long j = 0; j < 1; j++) {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
for (int s = 0; s < count; s++) {
CompletableFuture<DLSN> dlsnFuture = writers[s].write(record);
dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
}
}
for (int s = 0; s < count; s++) {
writers[s].closeAndComplete();
}
}
writeLatch.await();
assertFalse("All writes should succeed", writeErrors.get());
for (int s = 0; s < count; s++) {
readerDoneLatches[s].await();
assertFalse("Reader " + s + " should not encounter errors", readers[s].areErrorsFound());
syncLatches[s].await();
assertEquals(numLogSegments * numRecordsPerLogSegment, readers[s].getNumReads().get());
assertTrue("Reader " + s + " should position at least once", readers[s].getNumReaderPositions().get() > 0);
}
for (int s = 0; s < count; s++) {
readers[s].stop();
dlms[s].close();
}
}
public static void main(String[] args) throws Exception {
try {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@Cleanup
TestingServer zkTestServer = new TestingServerStarter().start();
ServiceBuilder serviceBuilder = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig());
serviceBuilder.initialize();
StreamSegmentStore store = serviceBuilder.createStreamSegmentService();
TableStore tableStore = serviceBuilder.createTableStoreService();
int port = Config.SERVICE_PORT;
@Cleanup
PravegaConnectionListener server = new PravegaConnectionListener(false, port, store, tableStore,
serviceBuilder.getLowPriorityExecutor());
server.startListening();
// Create controller object for testing against a separate controller report.
@Cleanup
ControllerWrapper controllerWrapper = new ControllerWrapper(zkTestServer.getConnectString(), port);
Controller controller = controllerWrapper.getController();
final String scope = "scope";
controllerWrapper.getControllerService().createScope(scope).get();
final String streamName = "stream1";
final StreamConfiguration config =
StreamConfiguration.builder().scalingPolicy(
ScalingPolicy.fixed(1)).build();
Stream stream = new StreamImpl(scope, streamName);
log.info("Creating stream {}/{}", scope, streamName);
if (!controller.createStream(scope, streamName, config).get()) {
log.error("Stream already existed, exiting");
return;
}
// Test 1: scale stream: split one segment into two
log.info("Scaling stream {}/{}, splitting one segment into two", scope, streamName);
Map<Double, Double> map = new HashMap<>();
map.put(0.0, 0.5);
map.put(0.5, 1.0);
if (!controller.scaleStream(stream, Collections.singletonList(0L), map, executor).getFuture().get()) {
log.error("Scale stream: splitting segment into two failed, exiting");
return;
}
// Test 2: scale stream: merge two segments into one
log.info("Scaling stream {}/{}, merging two segments into one", scope, streamName);
CompletableFuture<Boolean> scaleResponseFuture = controller.scaleStream(stream, Arrays.asList(1L, 2L),
Collections.singletonMap(0.0, 1.0), executor).getFuture();
if (!scaleResponseFuture.get()) {
log.error("Scale stream: merging two segments into one failed, exiting");
return;
}
// Test 3: create a transaction, and try scale operation, it should fail with precondition check failure
CompletableFuture<TxnSegments> txnFuture = controller.createTransaction(stream, 5000);
TxnSegments transaction = txnFuture.get();
if (transaction == null) {
log.error("Create transaction failed, exiting");
return;
}
log.info("Scaling stream {}/{}, splitting one segment into two, while transaction is ongoing",
scope, streamName);
scaleResponseFuture = controller.scaleStream(stream, Collections.singletonList(3L), map, executor).getFuture();
CompletableFuture<Boolean> future = scaleResponseFuture.whenComplete((r, e) -> {
if (e != null) {
log.error("Failed: scale with ongoing transaction.", e);
} else if (getAndHandleExceptions(controller.checkTransactionStatus(stream, transaction.getTxnId()),
RuntimeException::new) != Transaction.Status.OPEN) {
log.info("Success: scale with ongoing transaction.");
} else {
log.error("Failed: scale with ongoing transaction.");
}
});
CompletableFuture<Void> statusFuture = controller.abortTransaction(stream, transaction.getTxnId());
statusFuture.get();
future.get();
log.info("All scaling test PASSED");
ExecutorServiceHelpers.shutdown(executor);
System.exit(0);
} catch (Throwable t) {
log.error("test failed with {}", t);
System.exit(-1);
}
}
@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
CompletableFuture<Void> result = new CompletableFuture<>();
createSystemTopicFactoryIfNeeded();
SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(),
EventType.TOPIC_POLICY);
CompletableFuture<SystemTopicClient.Writer> writerFuture = systemTopicClient.newWriterAsync();
writerFuture.whenComplete((writer, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
writer.writeAsync(
PulsarEvent.builder()
.actionType(ActionType.UPDATE)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
.topic(topicName.getLocalName())
.policies(policies)
.build())
.build()).whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
if (messageId != null) {
result.complete(null);
} else {
result.completeExceptionally(new RuntimeException("Got message id is null."));
}
}
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
}
}
});
})
);
}
});
return result;
}
@Override
public void handle(Invocation invocation, AsyncResponse asyncResponse) throws Exception {
if (invocation.getMicroserviceName().equals("user-service")
&& (invocation.getOperationName().equals("login")
|| (invocation.getOperationName().equals("getSession")))) {
// login:return session id, set cookie by javascript
invocation.next(asyncResponse);
} else {
// check session
String sessionId = invocation.getContext("session-id");
if (sessionId == null) {
throw new InvocationException(403, "", "session is not valid.");
}
String sessionInfo = sessionCache.getIfPresent(sessionId);
if (sessionInfo != null) {
try {
// session info stored in InvocationContext. Microservices can get it.
invocation.addContext("session-id", sessionId);
invocation.addContext("session-info", sessionInfo);
invocation.next(asyncResponse);
} catch (Exception e) {
asyncResponse.complete(Response.failResp(new InvocationException(500, "", e.getMessage())));
}
return;
}
// In edge, handler is executed in reactively. Must have no blocking logic.
CompletableFuture<SessionInfo> result = userServiceClient.getGetSessionOperation().getSession(sessionId);
result.whenComplete((info, e) -> {
if (result.isCompletedExceptionally()) {
asyncResponse.complete(Response.failResp(new InvocationException(403, "", "session is not valid.")));
} else {
if (info == null) {
asyncResponse.complete(Response.failResp(new InvocationException(403, "", "session is not valid.")));
return;
}
try {
// session info stored in InvocationContext. Microservices can get it.
invocation.addContext("session-id", sessionId);
String sessionInfoStr = JsonUtils.writeValueAsString(info);
invocation.addContext("session-info", sessionInfoStr);
invocation.next(asyncResponse);
sessionCache.put(sessionId, sessionInfoStr);
} catch (Exception ee) {
asyncResponse.complete(Response.failResp(new InvocationException(500, "", ee.getMessage())));
}
}
});
}
}
public static <V> Promise<V> fromCompletableFuture(CompletableFuture<V> completableFuture) {
final Promise<V> promise = new JdkPromise<>(completableFuture);
completableFuture.whenComplete(new UniRelay<>(promise));
return promise;
}
private static void readNext(final AsyncLogReader reader,
final DLSN startPosition,
final long startSequenceId,
final boolean monotonic,
final CountDownLatch syncLatch,
final CountDownLatch completionLatch,
final AtomicBoolean errorsFound) {
CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN value) {
try {
if (monotonic) {
assertEquals(startSequenceId, value.getSequenceId());
} else {
assertTrue(value.getSequenceId() < 0);
assertTrue(value.getSequenceId() > startSequenceId);
}
LOG.info("Received record {} from {}", value, reader.getStreamName());
assertTrue(!value.isControl());
assertTrue(value.getDlsn().getSlotId() == 0);
assertTrue(value.getDlsn().compareTo(startPosition) >= 0);
DLMTestUtil.verifyLargeLogRecord(value);
} catch (Exception exc) {
LOG.debug("Exception Encountered when verifying log record {} : ", value.getDlsn(), exc);
errorsFound.set(true);
completionLatch.countDown();
return;
}
syncLatch.countDown();
if (syncLatch.getCount() <= 0) {
completionLatch.countDown();
} else {
TestAsyncReaderWriter.readNext(
reader,
value.getDlsn().getNextDLSN(),
monotonic ? value.getSequenceId() + 1 : value.getSequenceId(),
monotonic,
syncLatch,
completionLatch,
errorsFound);
}
}
@Override
public void onFailure(Throwable cause) {
LOG.error("Encountered Exception on reading {}", reader.getStreamName(), cause);
errorsFound.set(true);
completionLatch.countDown();
}
});
}
/**
* Suspending job, all the running tasks will be cancelled, and communication with other components
* will be disposed.
*
* <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
* calling the {@link #start(JobMasterId)} method once we take the leadership back again.
*
* <p>This method is executed asynchronously
*
* @param cause The reason of why this job been suspended.
* @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
*/
public CompletableFuture<Acknowledge> suspend(final Exception cause) {
CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(
() -> suspendExecution(cause),
RpcUtils.INF_TIMEOUT);
return suspendFuture.whenComplete((acknowledge, throwable) -> stop());
}