下面列出了java.util.concurrent.CompletableFuture#handle ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@VisibleForTesting
CompletableFuture<String> refreshToken() {
long traceEnterId = LoggerHelpers.traceEnter(log, "refreshToken", this.scopeName, this.streamName);
CompletableFuture<Void> currentRefreshFuture = tokenRefreshFuture.get();
if (currentRefreshFuture == null) {
log.debug("Initiating token refresh for scope {} and stream {}", this.scopeName, this.streamName);
currentRefreshFuture = this.recreateToken();
this.tokenRefreshFuture.compareAndSet(null, currentRefreshFuture);
} else {
log.debug("Token is already under refresh for scope {} and stream {}", this.scopeName, this.streamName);
}
final CompletableFuture<Void> handleToCurrentRefreshFuture = currentRefreshFuture;
return currentRefreshFuture.handle((v, ex) -> {
this.tokenRefreshFuture.compareAndSet(handleToCurrentRefreshFuture, null);
LoggerHelpers.traceLeave(log, "refreshToken", traceEnterId, this.scopeName, this.streamName);
if (ex != null) {
log.warn("Encountered an exception in when refreshing token for scope {} and stream {}",
this.scopeName, this.streamName, Exceptions.unwrap(ex));
throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
} else {
return delegationToken.get().getValue();
}
});
}
/**
* Tests ordered failure of future callbacks.
*/
public void testOrderedFailure() throws Throwable {
CompletableFuture<String> future = new OrderedCompletableFuture<>();
AtomicInteger order = new AtomicInteger();
future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
future.handle((r, e) -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.thenRun(() -> fail());
future.thenAccept(r -> fail());
future.exceptionally(e -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.completeExceptionally(new RuntimeException("foo"));
}
@Override
public CompletionStage<Void> writeNonBlocking(ByteBuf data, boolean last) {
CompletableFuture<Void> ret = new CompletableFuture<>();
if (last && data == null) {
request.response().end(handler(ret));
return ret;
}
Buffer buffer = createBuffer(data);
if (last) {
request.response().end(buffer, handler(ret));
} else {
request.response().write(buffer, handler(ret));
}
return ret.handle((v, t) -> {
if (t != null) {
if (data != null && data.refCnt() > 0) {
data.release();
}
rethrow(new IOException("Failed to write", t));
}
return v;
});
}
@Test
public void completeExceptionallyExample() {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
try {
cf.join();
fail("Should have thrown an exception");
} catch (CompletionException ex) { // just for testing
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}
private void addProtection(DeviceId did, ProtectionConfig added) {
ProtectedTransportEndpointDescription description = added.asDescription();
log.info("adding protection {}-{}", did, description);
ProtectionConfigBehaviour behaviour = getBehaviour(did);
CompletableFuture<ConnectPoint> result;
result = behaviour.createProtectionEndpoint(description);
result.handle((vPort, e) -> {
if (vPort != null) {
log.info("Virtual Port {} created for {}", vPort, description);
log.debug("{}", deviceService.getPort(vPort));
} else {
log.error("Protection {} exceptionally failed.", added, e);
}
return vPort;
});
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void send(String source, OutgoingMessageEnvelope envelope) {
checkForSendCallbackErrors("Received exception on message send");
String streamName = envelope.getSystemStream().getStream();
String streamId = physicalToStreamIds.getOrDefault(streamName, streamName);
long beforeSendTimeMs = System.currentTimeMillis();
CompletableFuture<Void> sendResult = sendAsync(source, envelope);
long afterSendTimeMs = System.currentTimeMillis();
long latencyMs = afterSendTimeMs - beforeSendTimeMs;
sendLatency.get(streamId).update(latencyMs);
aggSendLatency.update(latencyMs);
pendingFutures.add(sendResult);
// Auto update the metrics and possible throwable when futures are complete.
sendResult.handle((aVoid, throwable) -> {
long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
sendCallbackLatency.get(streamId).update(callbackLatencyMs);
aggSendCallbackLatency.update(callbackLatencyMs);
if (throwable != null) {
sendErrors.get(streamId).inc();
aggSendErrors.inc();
LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
sendExceptionOnCallback.compareAndSet(null, throwable);
}
return aVoid;
});
}
public static <T> CompletableFuture<T> tap(CompletableFuture<T> input, BiConsumer<T, Throwable> tapper) {
CompletableFuture<T> outcome = new CompletableFuture<>();
input.handle((result, throwable) -> {
tapper.accept(result, throwable);
completeSomehow(outcome, result, throwable);
return null;
});
return outcome;
}
@Test
public void should_Clear_values_from_cache_after_errors() {
List<Collection<Integer>> loadCalls = new ArrayList<>();
DataLoader<Integer, Integer> errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls);
CompletableFuture<Integer> future1 = errorLoader.load(1);
future1.handle((value, t) -> {
if (t != null) {
// Presumably determine if this error is transient, and only clear the cache in that case.
errorLoader.clear(1);
}
return null;
});
errorLoader.dispatch();
await().until(future1::isDone);
assertThat(future1.isCompletedExceptionally(), is(true));
assertThat(cause(future1), instanceOf(IllegalStateException.class));
CompletableFuture<Integer> future2 = errorLoader.load(1);
future2.handle((value, t) -> {
if (t != null) {
// Again, only do this if you can determine the error is transient.
errorLoader.clear(1);
}
return null;
});
errorLoader.dispatch();
await().until(future2::isDone);
assertThat(future2.isCompletedExceptionally(), is(true));
assertThat(cause(future2), instanceOf(IllegalStateException.class));
assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1))));
}
private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
future.handle((res, cause) -> {
if (cause != null) {
resultHandler.onError(convert(cause));
} else {
resultHandler.onComplete(null);
}
return null;
});
}
@Test
public void itExecutesReturnedExceptionalFuturesOnTheProvidedExecutor() {
ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SmtpSessionTestExecutor").build());
SmtpSession session = new SmtpSession(channel, responseHandler, CONFIG, executorService, SSL_ENGINE_SUPPLIER);
CompletableFuture<SmtpClientResponse> future = session.send(SMTP_REQUEST);
CompletableFuture<Boolean> assertionFuture = future.handle((r, e) -> {
assertThat(Thread.currentThread().getName()).contains("SmtpSessionTestExecutor");
return true;
});
responseFuture.completeExceptionally(new RuntimeException());
assertionFuture.join();
}
public static <T> CompletableFuture<List<T>> allOfOrException(Collection<CompletableFuture<T>> futures) {
CompletableFuture<List<T>> result = allOf(futures);
for (CompletableFuture<?> f : futures) {
f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));
}
return result;
}
static <T> CompletableFuture<List<T>> allOfOrException(Collection<CompletableFuture<T>> futures) {
CompletableFuture<List<T>> result = futures.stream()
.collect(collectingAndThen(
toList(),
l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
.thenApply(__1 -> l.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))));
for (CompletableFuture<?> f : futures) {
f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));
}
return result;
}
private void respondAndClose(ChannelHandlerContext ctx, LiveHttpResponse response) {
HttpResponseWriter responseWriter = responseWriterFactory.create(ctx);
CompletableFuture<Void> future = responseWriter.write(response);
future.handle((ignore, reason) -> {
if (future.isCompletedExceptionally()) {
LOGGER.error(warningMessage("message='Unable to send error', response=" + reason));
}
ctx.close();
return null;
});
}
/**
* Allocates and assigns a slot obtained from the slot provider to the execution.
*
* @param slotProviderStrategy to obtain a new slot from
* @param locationPreferenceConstraint constraint for the location preferences
* @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
* Can be empty if the allocation ids are not required for scheduling.
* @return Future which is completed with the allocated slot once it has been assigned
* or with an exception if an error occurred.
*/
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
SlotProviderStrategy slotProviderStrategy,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
checkNotNull(slotProviderStrategy);
assertRunningInJobMasterMainThread();
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// sanity check
if (locationConstraint != null && sharingGroup == null) {
throw new IllegalStateException(
"Trying to schedule with co-location constraint but without slot sharing allowed.");
}
// this method only works if the execution is in the state 'CREATED'
if (transitionState(CREATED, SCHEDULED)) {
final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
ExecutionVertex executionVertex = getVertex();
AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
Collection<AllocationID> previousAllocationIDs =
lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
// calculate the preferred locations
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> logicalSlotFuture =
preferredLocationsFuture.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) -> {
LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
return slotProviderStrategy.allocateSlot(
slotRequestId,
toSchedule,
SlotProfile.priorAllocation(
vertex.getResourceProfile(),
getPhysicalSlotResourceProfile(vertex),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds));
});
// register call back to cancel slot request in case that the execution gets canceled
releaseFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
if (logicalSlotFuture.cancel(false)) {
slotProviderStrategy.cancelSlotRequest(
slotRequestId,
slotSharingGroupId,
new FlinkException("Execution " + this + " was released."));
}
});
// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
return logicalSlotFuture.handle(
(LogicalSlot logicalSlot, Throwable failure) -> {
if (failure != null) {
throw new CompletionException(failure);
}
if (tryAssignResource(logicalSlot)) {
return logicalSlot;
} else {
// release the slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(
new FlinkException(
"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
} else {
// call race, already deployed, or already done
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
@Override
protected void doResolve(InetSocketAddress unresolvedAddress, Promise<InetSocketAddress> promise)
throws Exception {
requireNonNull(unresolvedAddress, "unresolvedAddress");
requireNonNull(promise, "promise");
if (resolverClosed) {
promise.tryFailure(new IllegalStateException("resolver is closed already."));
return;
}
final String hostname = unresolvedAddress.getHostString();
final int port = unresolvedAddress.getPort();
final CompletableFuture<CacheEntry> entryFuture = cache.get(hostname);
if (entryFuture != null) {
handleFromCache(entryFuture, promise, port);
return;
}
final CompletableFuture<CacheEntry> result = new CompletableFuture<>();
final CompletableFuture<CacheEntry> previous = cache.putIfAbsent(hostname, result);
if (previous != null) {
handleFromCache(previous, promise, port);
return;
}
final List<DnsQuestion> questions =
dnsRecordTypes.stream()
.map(type -> DnsQuestionWithoutTrailingDot.of(hostname, type))
.collect(toImmutableList());
sendQueries(questions, hostname, result);
result.handle((entry, unused) -> {
final Throwable cause = entry.cause();
if (cause != null) {
if (entry.hasCacheableCause() && negativeTtl > 0) {
executor().schedule(() -> cache.remove(hostname), negativeTtl, TimeUnit.SECONDS);
} else {
cache.remove(hostname);
}
promise.tryFailure(cause);
return null;
}
entry.scheduleRefresh(entry.ttlMillis());
promise.trySuccess(new InetSocketAddress(entry.address(), port));
return null;
});
}
@Override
public CompletableFuture<EmptyResponseBody> handleRequest(HandlerRequest<EmptyRequestBody, JobCancellationMessageParameters> request, RestfulGateway gateway) throws RestHandlerException {
final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
final List<TerminationModeQueryParameter.TerminationMode> terminationModes = request.getQueryParameter(TerminationModeQueryParameter.class);
final TerminationModeQueryParameter.TerminationMode terminationMode;
if (terminationModes.isEmpty()) {
terminationMode = defaultTerminationMode;
} else {
// picking the first termination mode value
terminationMode = terminationModes.get(0);
}
final CompletableFuture<Acknowledge> terminationFuture;
switch (terminationMode) {
case CANCEL:
terminationFuture = gateway.cancelJob(jobId, timeout);
break;
case STOP:
throw new RestHandlerException("The termination mode \"stop\" has been removed. For " +
"an ungraceful shutdown, please use \"cancel\" instead. For a graceful shutdown, " +
"please use \"jobs/:jobId/stop\" instead." , HttpResponseStatus.PERMANENT_REDIRECT);
default:
terminationFuture = FutureUtils.completedExceptionally(new RestHandlerException("Unknown termination mode " + terminationMode + '.', HttpResponseStatus.BAD_REQUEST));
}
return terminationFuture.handle(
(Acknowledge ack, Throwable throwable) -> {
if (throwable != null) {
Throwable error = ExceptionUtils.stripCompletionException(throwable);
if (error instanceof TimeoutException) {
throw new CompletionException(
new RestHandlerException(
"Job cancellation timed out.",
HttpResponseStatus.REQUEST_TIMEOUT, error));
} else if (error instanceof FlinkJobNotFoundException) {
throw new CompletionException(
new RestHandlerException(
"Job could not be found.",
HttpResponseStatus.NOT_FOUND, error));
} else {
throw new CompletionException(
new RestHandlerException(
"Job cancellation failed: " + error.getMessage(),
HttpResponseStatus.INTERNAL_SERVER_ERROR, error));
}
} else {
return EmptyResponseBody.getInstance();
}
});
}
public <T,U> CompletableFuture<U> handle
(CompletableFuture<T> f,
BiFunction<? super T,Throwable,? extends U> a) {
return f.handle(a);
}
/**
* This api will async upload the outputstream into block using stageBlocks,
* reint outputstream
* and add the operation to future.
* @throws RuntimeException when
* - blob's stageBlock fails after MAX_ATTEMPTs
* - number of blocks exceeds MAX_BLOCKS_IN_AZURE_BLOB
*/
private synchronized void uploadBlockAsync() {
if (!byteArrayOutputStream.isPresent()) {
return;
}
long size = byteArrayOutputStream.get().size();
if (size == 0) {
return;
}
LOG.info("Blob: {} uploadBlock. Size:{}", blobAsyncClient.getBlobUrl().toString(), size);
// Azure sdk requires block Id to be encoded and all blockIds of a blob to be of the same length
// also, a block blob can have upto 50,000 blocks, hence using a 5 digit block id.
String blockId = String.format("%05d", blockNum);
String blockIdEncoded = Base64.getEncoder().encodeToString(blockId.getBytes());
blockList.add(blockIdEncoded);
byte[] localByte = byteArrayOutputStream.get().toByteArray();
byteArrayOutputStream.get().reset();
totalUploadedBlockSize += localByte.length;
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
// call async stageblock and add to future
@Override
public void run() {
int attemptCount = 0;
byte[] compressedLocalByte = compression.compress(localByte);
int blockSize = compressedLocalByte.length;
while (attemptCount < MAX_ATTEMPT) {
try {
ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize);
metrics.updateCompressByteMetrics(blockSize);
LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize);
metrics.updateAzureUploadMetrics();
// StageBlock generates exception on Failure.
stageBlock(blockIdEncoded, outputStream, blockSize);
break;
} catch (Exception e) {
attemptCount += 1;
String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
+ " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount;
LOG.error(msg, e);
if (attemptCount == MAX_ATTEMPT) {
throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e);
}
}
}
}
}, blobThreadPool);
pendingUpload.add(future);
future.handle((aVoid, throwable) -> {
if (throwable == null) {
LOG.info("Upload block for blob: {} with blockid: {} finished.", blobAsyncClient.getBlobUrl().toString(), blockId);
pendingUpload.remove(future);
return aVoid;
} else {
throw new AzureException("Blob upload failed for blob " + blobAsyncClient.getBlobUrl().toString()
+ " and block with id: " + blockId, throwable);
}
});
blockNum += 1;
if (blockNum >= MAX_BLOCKS_IN_AZURE_BLOB) {
throw new AzureException("Azure blob only supports 50000 blocks in a blob. Current number of blocks is " + blockNum);
}
}
/**
* Merges the JSON files sequentially as specified in the {@link MergeQuery}.
*/
default <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
requireNonNull(revision, "revision");
requireNonNull(query, "query");
final List<MergeSource> mergeSources = query.mergeSources();
// Only JSON files can currently be merged.
mergeSources.forEach(path -> validateJsonFilePath(path.path(), "path"));
final Revision normalizedRevision;
try {
normalizedRevision = normalizeNow(revision);
} catch (Exception e) {
return CompletableFutures.exceptionallyCompletedFuture(e);
}
final List<CompletableFuture<Entry<?>>> entryFutures = new ArrayList<>(mergeSources.size());
mergeSources.forEach(path -> {
if (!path.isOptional()) {
entryFutures.add(get(normalizedRevision, path.path()));
} else {
entryFutures.add(getOrNull(normalizedRevision, path.path()));
}
});
final CompletableFuture<MergedEntry<?>> mergedEntryFuture = mergeEntries(entryFutures, revision,
query);
final CompletableFuture<MergedEntry<T>> future = new CompletableFuture<>();
mergedEntryFuture.handle((mergedEntry, cause) -> {
if (cause != null) {
if (!(cause instanceof CentralDogmaException)) {
cause = new QueryExecutionException(cause);
}
future.completeExceptionally(cause);
return null;
}
future.complete(unsafeCast(mergedEntry));
return null;
});
return future;
}
/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}