下面列出了java.util.concurrent.CompletableFuture#handleAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
} else {
return acknowledge;
}
}, getRpcService().getExecutor());
}
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
} else {
return acknowledge;
}
}, getRpcService().getExecutor());
}
private <T> void attemptOperation(Supplier<CompletableFuture<T>> asyncRequest, CompletableFuture<T> result, int attempt) {
CompletableFuture<T> f = asyncRequest.get();
f.handleAsync((val, throwable) -> {
if (throwable != null) {
if (attempt < 10 && CompletableFutureUtil.unwrap(throwable) instanceof OptimisticLockingException) {
logger.debug("got optimistic locking exception - retrying", throwable);
attemptOperation(asyncRequest, result, attempt + 1);
} else {
if (logger.isDebugEnabled())
logger.debug("got exception - NOT retrying: " + attempt, throwable);
result.completeExceptionally(throwable);
}
} else
result.complete(val);
return null;
});
}
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
} else {
return acknowledge;
}
}, getRpcService().getExecutor());
}
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final TaskExecutorRegistration taskExecutorRegistration,
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(), TaskExecutorGateway.class);
taskExecutorGatewayFutures.put(taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
final ResourceID resourceId = taskExecutorRegistration.getResourceId();
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {
taskExecutorGatewayFutures.remove(resourceId);
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
return registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration);
}
} else {
log.debug("Ignoring outdated TaskExecutorGateway connection for {}.", resourceId);
return new RegistrationResponse.Decline("Decline outdated task executor registration.");
}
},
getMainThreadExecutor());
}
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final int dataPort,
final HardwareDescription hardwareDescription,
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
taskExecutorGatewayFutures.remove(taskExecutorResourceId);
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
return registerTaskExecutorInternal(
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
dataPort,
hardwareDescription);
}
} else {
log.info("Ignoring outdated TaskExecutorGateway connection.");
return new RegistrationResponse.Decline("Decline outdated task executor registration.");
}
},
getMainThreadExecutor());
}
/**
* Triggers a stack trace sample for a operator to gather the back pressure
* statistics. If there is a sample in progress for the operator, the call
* is ignored.
*
* @param vertex Operator to get the stats for.
* @return Flag indicating whether a sample with triggered.
*/
private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
assert(Thread.holdsLock(lock));
if (shutDown) {
return false;
}
if (!pendingStats.contains(vertex) &&
!vertex.getGraph().getState().isGloballyTerminalState()) {
Executor executor = vertex.getGraph().getFutureExecutor();
// Only trigger if still active job
if (executor != null) {
pendingStats.add(vertex);
if (LOG.isDebugEnabled()) {
LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
}
CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
vertex.getTaskVertices(),
numSamples,
delayBetweenSamples,
MAX_STACK_TRACE_DEPTH);
sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
return true;
}
}
return false;
}
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final int dataPort,
final HardwareDescription hardwareDescription,
final Time timeout) {
CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
taskExecutorGatewayFutures.remove(taskExecutorResourceId);
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
return registerTaskExecutorInternal(
taskExecutorGateway,
taskExecutorAddress,
taskExecutorResourceId,
dataPort,
hardwareDescription);
}
} else {
log.info("Ignoring outdated TaskExecutorGateway connection.");
return new RegistrationResponse.Decline("Decline outdated task executor registration.");
}
},
getMainThreadExecutor());
}
/**
* Triggers a stack trace sample for a operator to gather the back pressure
* statistics. If there is a sample in progress for the operator, the call
* is ignored.
*
* @param vertex Operator to get the stats for.
* @return Flag indicating whether a sample with triggered.
*/
private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
assert(Thread.holdsLock(lock));
if (shutDown) {
return false;
}
if (!pendingStats.contains(vertex) &&
!vertex.getGraph().getState().isGloballyTerminalState()) {
Executor executor = vertex.getGraph().getFutureExecutor();
// Only trigger if still active job
if (executor != null) {
pendingStats.add(vertex);
if (LOG.isDebugEnabled()) {
LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
}
CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
vertex.getTaskVertices(),
numSamples,
delayBetweenSamples,
MAX_STACK_TRACE_DEPTH);
sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
return true;
}
}
return false;
}
private <R, T> CompletableFuture<R> applyOnExecutor(CompletableFuture<T> eventLoopFuture, Function<T, R> mapper) {
if (executor == SmtpSessionFactoryConfig.DIRECT_EXECUTOR) {
return eventLoopFuture.thenApply(mapper);
}
// use handleAsync to ensure exceptions and other callbacks are completed on the ExecutorService thread
return eventLoopFuture.handleAsync((rs, e) -> {
if (e != null) {
throw Throwables.propagate(e);
}
return mapper.apply(rs);
}, executor);
}
/**
* Triggers a back pressure request for a vertex to gather the back pressure
* statistics. If there is a request in progress for the vertex, the call
* is ignored.
*
* @param vertex Vertex to get the stats for.
*/
private void triggerBackPressureRequestInternal(final ExecutionJobVertex vertex) {
assert(Thread.holdsLock(lock));
if (shutDown) {
return;
}
if (!pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState()) {
Executor executor = vertex.getGraph().getFutureExecutor();
// Only trigger for still active job
if (executor != null) {
pendingStats.add(vertex);
if (LOG.isDebugEnabled()) {
LOG.debug("Triggering back pressure request for tasks: " + Arrays.toString(vertex.getTaskVertices()));
}
CompletableFuture<BackPressureStats> statsFuture =
coordinator.triggerBackPressureRequest(vertex.getTaskVertices());
statsFuture.handleAsync(new BackPressureRequestCompletionCallback(vertex), executor);
}
}
}
/**
* When an already-contextualized Consumer or BiFunction is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualConsumerAndBiFunctionOverrideContextOfManagedExecutor()
throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-1"));
Label.set("contextualBiFunctionOverride-label-1");
BiFunction<Integer, Throwable, Integer> precontextualizedFunction1 = labelContext.contextualFunction((result, failure) -> {
Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return failure == null ? result : 100;
});
Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-2"));
Label.set("contextualBiFunctionOverride-label-2");
BiFunction<Integer, Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction((i, j) -> {
Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
return i - j;
});
Buffer.set(new StringBuffer("contextualConsumerOverride-buffer-3"));
Label.set("contextualConsumerOverride-label-3");
Consumer<Integer> precontextualizedConsumer3 = labelContext.contextualConsumer(i -> {
Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualConsuemrOverride-buffer-4"));
Label.set("contextualConsumerOverride-label-4");
Consumer<Integer> precontextualizedConsumer4 = labelContext.contextualConsumer(i -> {
Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-4",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
BiFunction<Void, Void, String> normalFunction5 = (unused1, unused2) -> {
Assert.assertEquals(Buffer.get().toString(), "contextualConsumerAndBiFunctionOverride-buffer-5",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
return "done";
};
Buffer.set(new StringBuffer("contextualConsumerAndBiFunctionOverride-buffer-5"));
Label.set("contextualConsumerAndBiFunctionOverride-label-5");
CompletableFuture<Integer> stage0 = executor.failedFuture(new ArrayIndexOutOfBoundsException("Expected error."));
CompletableFuture<Integer> stage1 = stage0.handleAsync(precontextualizedFunction1);
CompletableFuture<Integer> stage2 = executor.completedFuture(200).thenCombineAsync(stage1, precontextualizedFunction2);
CompletableFuture<Void> stage3 = stage2.thenAccept(precontextualizedConsumer3);
CompletableFuture<Void> stage4 = stage2.acceptEitherAsync(stage1, precontextualizedConsumer4);
CompletableFuture<String> stage5 = stage4.thenCombine(stage3, normalFunction5);
Assert.assertEquals(stage5.join(), "done",
"Unexpected result for completion stage.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U> CompletableFuture<U> handle
(CompletableFuture<T> f,
BiFunction<? super T,Throwable,? extends U> a) {
return f.handleAsync(a);
}
public <T,U> CompletableFuture<U> handle
(CompletableFuture<T> f,
BiFunction<? super T,Throwable,? extends U> a) {
return f.handleAsync(a, new ThreadExecutor());
}
/**
* Evaluate a script and allow for the submission of alteration to the entire evaluation execution lifecycle.
*
* @param script the script to evaluate
* @param language the language to evaluate it in
* @param boundVars the bindings to evaluate in the context of the script
* @param lifeCycle a set of functions that can be applied at various stages of the evaluation process
*/
public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars, final LifeCycle lifeCycle) {
final String lang = Optional.ofNullable(language).orElse("gremlin-groovy");
logger.debug("Preparing to evaluate script - {} - in thread [{}]", script, Thread.currentThread().getName());
final Bindings bindings = new SimpleBindings();
bindings.putAll(globalBindings);
bindings.putAll(boundVars);
// override the timeout if the lifecycle has a value assigned
final long scriptEvalTimeOut = lifeCycle.getEvaluationTimeoutOverride().orElse(evaluationTimeout);
final CompletableFuture<Object> evaluationFuture = new CompletableFuture<>();
final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
try {
lifeCycle.getBeforeEval().orElse(beforeEval).accept(bindings);
logger.debug("Evaluating script - {} - in thread [{}]", script, Thread.currentThread().getName());
// do this weirdo check until the now deprecated ScriptEngines is gutted
final Object o = gremlinScriptEngineManager.getEngineByName(lang).eval(script, bindings);
// apply a transformation before sending back the result - useful when trying to force serialization
// in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
// transactional constraints
final Object result = lifeCycle.getTransformResult().isPresent() ?
lifeCycle.getTransformResult().get().apply(o) : o;
// a mechanism for taking the final result and doing something with it in the same thread, but
// AFTER the eval and transform are done and that future completed. this provides a final means
// for working with the result in the same thread as it was eval'd
if (lifeCycle.getWithResult().isPresent()) lifeCycle.getWithResult().get().accept(result);
lifeCycle.getAfterSuccess().orElse(afterSuccess).accept(bindings);
// the evaluationFuture must be completed after all processing as an exception in lifecycle events
// that must raise as an exception to the caller who has the returned evaluationFuture. in other words,
// if it occurs before this point, then the handle() method won't be called again if there is an
// exception that ends up below trying to completeExceptionally()
evaluationFuture.complete(result);
} catch (Throwable ex) {
final Throwable root = null == ex.getCause() ? ex : ExceptionUtils.getRootCause(ex);
// thread interruptions will typically come as the result of a timeout, so in those cases,
// check for that situation and convert to TimeoutException
if (root instanceof InterruptedException
|| root instanceof TraversalInterruptedException
|| root instanceof InterruptedIOException) {
lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings);
evaluationFuture.completeExceptionally(new TimeoutException(
String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage())));
} else {
lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
evaluationFuture.completeExceptionally(root);
}
}
return null;
});
final WeakReference<CompletableFuture<Object>> evaluationFutureRef = new WeakReference<>(evaluationFuture);
final Future<?> executionFuture = executorService.submit(evalFuture);
if (scriptEvalTimeOut > 0) {
// Schedule a timeout in the thread pool for future execution
final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
if (executionFuture.cancel(true)) {
final CompletableFuture<Object> ef = evaluationFutureRef.get();
if (ef != null) {
ef.completeExceptionally(new TimeoutException(
String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]", scriptEvalTimeOut, script)));
}
}
}, scriptEvalTimeOut, TimeUnit.MILLISECONDS);
// Cancel the scheduled timeout if the eval future is complete or the script evaluation failed with exception
evaluationFuture.handleAsync((v, t) -> {
if (!sf.isDone()) {
logger.debug("Killing scheduled timeout on script evaluation - {} - as the eval completed (possibly with exception).", script);
sf.cancel(true);
}
// no return is necessary - nothing downstream is concerned with what happens in here
return null;
}, scheduledExecutorService);
}
return evaluationFuture;
}
public <T,U> CompletableFuture<U> handle
(CompletableFuture<T> f,
BiFunction<? super T,Throwable,? extends U> a) {
return f.handleAsync(a);
}
public <T,U> CompletableFuture<U> handle
(CompletableFuture<T> f,
BiFunction<? super T,Throwable,? extends U> a) {
return f.handleAsync(a, new ThreadExecutor());
}
/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}
/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}
/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}