java.util.concurrent.CompletableFuture#handleAsync ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#handleAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Flink-CEPplus   文件: Dispatcher.java
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
	log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

	final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
		.thenApply(ignored -> Acknowledge.get());

	return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
		if (throwable != null) {
			cleanUpJobData(jobGraph.getJobID(), true);

			final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
			log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
			throw new CompletionException(
				new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
		} else {
			return acknowledge;
		}
	}, getRpcService().getExecutor());
}
 
源代码2 项目: flink   文件: Dispatcher.java
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
	log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

	final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
		.thenApply(ignored -> Acknowledge.get());

	return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
		if (throwable != null) {
			cleanUpJobData(jobGraph.getJobID(), true);

			final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
			log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
			throw new CompletionException(
				new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
		} else {
			return acknowledge;
		}
	}, getRpcService().getExecutor());
}
 
源代码3 项目: light-eventuate-4j   文件: AggregateRepository.java
private <T> void attemptOperation(Supplier<CompletableFuture<T>> asyncRequest, CompletableFuture<T> result, int attempt) {
  CompletableFuture<T> f = asyncRequest.get();
  f.handleAsync((val, throwable) -> {
    if (throwable != null) {
      if (attempt < 10 && CompletableFutureUtil.unwrap(throwable) instanceof OptimisticLockingException) {
        logger.debug("got optimistic locking exception - retrying", throwable);
        attemptOperation(asyncRequest, result, attempt + 1);
      } else {
        if (logger.isDebugEnabled())
          logger.debug("got exception - NOT retrying: " + attempt, throwable);
        result.completeExceptionally(throwable);
      }
    } else
      result.complete(val);
    return null;
  });
}
 
源代码4 项目: flink   文件: Dispatcher.java
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
	log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

	final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
		.thenApply(ignored -> Acknowledge.get());

	return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
		if (throwable != null) {
			cleanUpJobData(jobGraph.getJobID(), true);

			final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
			log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
			throw new CompletionException(
				new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
		} else {
			return acknowledge;
		}
	}, getRpcService().getExecutor());
}
 
源代码5 项目: flink   文件: ResourceManager.java
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
		final TaskExecutorRegistration taskExecutorRegistration,
		final Time timeout) {

	CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorRegistration.getTaskExecutorAddress(), TaskExecutorGateway.class);
	taskExecutorGatewayFutures.put(taskExecutorRegistration.getResourceId(), taskExecutorGatewayFuture);

	return taskExecutorGatewayFuture.handleAsync(
		(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
			final ResourceID resourceId = taskExecutorRegistration.getResourceId();
			if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(resourceId)) {
				taskExecutorGatewayFutures.remove(resourceId);
				if (throwable != null) {
					return new RegistrationResponse.Decline(throwable.getMessage());
				} else {
					return registerTaskExecutorInternal(taskExecutorGateway, taskExecutorRegistration);
				}
			} else {
				log.debug("Ignoring outdated TaskExecutorGateway connection for {}.", resourceId);
				return new RegistrationResponse.Decline("Decline outdated task executor registration.");
			}
		},
		getMainThreadExecutor());
}
 
源代码6 项目: Flink-CEPplus   文件: ResourceManager.java
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
		final String taskExecutorAddress,
		final ResourceID taskExecutorResourceId,
		final int dataPort,
		final HardwareDescription hardwareDescription,
		final Time timeout) {

	CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
	taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);

	return taskExecutorGatewayFuture.handleAsync(
		(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
			if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
				taskExecutorGatewayFutures.remove(taskExecutorResourceId);
				if (throwable != null) {
					return new RegistrationResponse.Decline(throwable.getMessage());
				} else {
					return registerTaskExecutorInternal(
						taskExecutorGateway,
						taskExecutorAddress,
						taskExecutorResourceId,
						dataPort,
						hardwareDescription);
				}
			} else {
				log.info("Ignoring outdated TaskExecutorGateway connection.");
				return new RegistrationResponse.Decline("Decline outdated task executor registration.");
			}
		},
		getMainThreadExecutor());
}
 
/**
 * Triggers a stack trace sample for a operator to gather the back pressure
 * statistics. If there is a sample in progress for the operator, the call
 * is ignored.
 *
 * @param vertex Operator to get the stats for.
 * @return Flag indicating whether a sample with triggered.
 */
private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
	assert(Thread.holdsLock(lock));

	if (shutDown) {
		return false;
	}

	if (!pendingStats.contains(vertex) &&
		!vertex.getGraph().getState().isGloballyTerminalState()) {

		Executor executor = vertex.getGraph().getFutureExecutor();

		// Only trigger if still active job
		if (executor != null) {
			pendingStats.add(vertex);

			if (LOG.isDebugEnabled()) {
				LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
			}

			CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
				vertex.getTaskVertices(),
				numSamples,
				delayBetweenSamples,
				MAX_STACK_TRACE_DEPTH);

			sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);

			return true;
		}
	}

	return false;
}
 
源代码8 项目: flink   文件: ResourceManager.java
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
		final String taskExecutorAddress,
		final ResourceID taskExecutorResourceId,
		final int dataPort,
		final HardwareDescription hardwareDescription,
		final Time timeout) {

	CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
	taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);

	return taskExecutorGatewayFuture.handleAsync(
		(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
			if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
				taskExecutorGatewayFutures.remove(taskExecutorResourceId);
				if (throwable != null) {
					return new RegistrationResponse.Decline(throwable.getMessage());
				} else {
					return registerTaskExecutorInternal(
						taskExecutorGateway,
						taskExecutorAddress,
						taskExecutorResourceId,
						dataPort,
						hardwareDescription);
				}
			} else {
				log.info("Ignoring outdated TaskExecutorGateway connection.");
				return new RegistrationResponse.Decline("Decline outdated task executor registration.");
			}
		},
		getMainThreadExecutor());
}
 
源代码9 项目: flink   文件: BackPressureStatsTrackerImpl.java
/**
 * Triggers a stack trace sample for a operator to gather the back pressure
 * statistics. If there is a sample in progress for the operator, the call
 * is ignored.
 *
 * @param vertex Operator to get the stats for.
 * @return Flag indicating whether a sample with triggered.
 */
private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
	assert(Thread.holdsLock(lock));

	if (shutDown) {
		return false;
	}

	if (!pendingStats.contains(vertex) &&
		!vertex.getGraph().getState().isGloballyTerminalState()) {

		Executor executor = vertex.getGraph().getFutureExecutor();

		// Only trigger if still active job
		if (executor != null) {
			pendingStats.add(vertex);

			if (LOG.isDebugEnabled()) {
				LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
			}

			CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
				vertex.getTaskVertices(),
				numSamples,
				delayBetweenSamples,
				MAX_STACK_TRACE_DEPTH);

			sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);

			return true;
		}
	}

	return false;
}
 
源代码10 项目: NioSmtpClient   文件: SmtpSession.java
private <R, T> CompletableFuture<R> applyOnExecutor(CompletableFuture<T> eventLoopFuture, Function<T, R> mapper) {
  if (executor == SmtpSessionFactoryConfig.DIRECT_EXECUTOR) {
    return eventLoopFuture.thenApply(mapper);
  }

  // use handleAsync to ensure exceptions and other callbacks are completed on the ExecutorService thread
  return eventLoopFuture.handleAsync((rs, e) -> {
    if (e != null) {
      throw Throwables.propagate(e);
    }

    return mapper.apply(rs);
  }, executor);
}
 
源代码11 项目: flink   文件: BackPressureStatsTrackerImpl.java
/**
 * Triggers a back pressure request for a vertex to gather the back pressure
 * statistics. If there is a request in progress for the vertex, the call
 * is ignored.
 *
 * @param vertex Vertex to get the stats for.
 */
private void triggerBackPressureRequestInternal(final ExecutionJobVertex vertex) {
	assert(Thread.holdsLock(lock));

	if (shutDown) {
		return;
	}

	if (!pendingStats.contains(vertex) && !vertex.getGraph().getState().isGloballyTerminalState()) {

		Executor executor = vertex.getGraph().getFutureExecutor();

		// Only trigger for still active job
		if (executor != null) {
			pendingStats.add(vertex);

			if (LOG.isDebugEnabled()) {
				LOG.debug("Triggering back pressure request for tasks: " + Arrays.toString(vertex.getTaskVertices()));
			}

			CompletableFuture<BackPressureStats> statsFuture =
				coordinator.triggerBackPressureRequest(vertex.getTaskVertices());

			statsFuture.handleAsync(new BackPressureRequestCompletionCallback(vertex), executor);
		}
	}
}
 
/**
 * When an already-contextualized Consumer or BiFunction is specified as the action/task,
 * the action/task runs with its already-captured context rather than
 * capturing and applying context per the configuration of the managed executor.
 *
 * @throws ExecutionException indicates test failure
 * @throws InterruptedException indicates test failure
 * @throws TimeoutException indicates test failure
 */
@Test
public void contextOfContextualConsumerAndBiFunctionOverrideContextOfManagedExecutor()
        throws ExecutionException, InterruptedException, TimeoutException {
    ThreadContext labelContext = ThreadContext.builder()
            .propagated(Label.CONTEXT_NAME)
            .unchanged()
            .cleared(ThreadContext.ALL_REMAINING)
            .build();

    ManagedExecutor executor = ManagedExecutor.builder()
            .propagated(Buffer.CONTEXT_NAME)
            .cleared(ThreadContext.ALL_REMAINING)
            .build();
    try {
        Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-1"));
        Label.set("contextualBiFunctionOverride-label-1");

        BiFunction<Integer, Throwable, Integer> precontextualizedFunction1 = labelContext.contextualFunction((result, failure) -> {
            Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-1",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return failure == null ? result : 100;
        });

        Buffer.set(new StringBuffer("contextualBiFunctionOverride-buffer-2"));
        Label.set("contextualBiFunctionOverride-label-2");

        BiFunction<Integer, Integer, Integer> precontextualizedFunction2 = labelContext.contextualFunction((i, j) -> {
            Assert.assertEquals(Label.get(), "contextualBiFunctionOverride-label-2",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
            return i - j;
        });

        Buffer.set(new StringBuffer("contextualConsumerOverride-buffer-3"));
        Label.set("contextualConsumerOverride-label-3");

        Consumer<Integer> precontextualizedConsumer3 = labelContext.contextualConsumer(i -> {
            Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-3",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        Buffer.set(new StringBuffer("contextualConsuemrOverride-buffer-4"));
        Label.set("contextualConsumerOverride-label-4");

        Consumer<Integer> precontextualizedConsumer4 = labelContext.contextualConsumer(i -> {
            Assert.assertEquals(Label.get(), "contextualConsumerOverride-label-4",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        BiFunction<Void, Void, String> normalFunction5 = (unused1, unused2) -> {
            Assert.assertEquals(Buffer.get().toString(), "contextualConsumerAndBiFunctionOverride-buffer-5",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Label.get(), "",
                    "Context type not cleared from thread.");
            return "done";
        };

        Buffer.set(new StringBuffer("contextualConsumerAndBiFunctionOverride-buffer-5"));
        Label.set("contextualConsumerAndBiFunctionOverride-label-5");

        CompletableFuture<Integer> stage0 = executor.failedFuture(new ArrayIndexOutOfBoundsException("Expected error."));
        CompletableFuture<Integer> stage1 = stage0.handleAsync(precontextualizedFunction1);
        CompletableFuture<Integer> stage2 = executor.completedFuture(200).thenCombineAsync(stage1, precontextualizedFunction2);
        CompletableFuture<Void> stage3 = stage2.thenAccept(precontextualizedConsumer3);
        CompletableFuture<Void> stage4 = stage2.acceptEitherAsync(stage1, precontextualizedConsumer4);
        CompletableFuture<String> stage5 = stage4.thenCombine(stage3, normalFunction5);

        Assert.assertEquals(stage5.join(), "done",
                "Unexpected result for completion stage.");
    }
    finally {
        executor.shutdownNow();
        // Restore original values
        Buffer.set(null);
        Label.set(null);
    }
}
 
源代码13 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> handle
    (CompletableFuture<T> f,
     BiFunction<? super T,Throwable,? extends U> a) {
    return f.handleAsync(a);
}
 
源代码14 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> handle
    (CompletableFuture<T> f,
     BiFunction<? super T,Throwable,? extends U> a) {
    return f.handleAsync(a, new ThreadExecutor());
}
 
源代码15 项目: tinkerpop   文件: GremlinExecutor.java
/**
 * Evaluate a script and allow for the submission of alteration to the entire evaluation execution lifecycle.
 *
 * @param script the script to evaluate
 * @param language the language to evaluate it in
 * @param boundVars the bindings to evaluate in the context of the script
 * @param lifeCycle a set of functions that can be applied at various stages of the evaluation process
 */
public CompletableFuture<Object> eval(final String script, final String language, final Bindings boundVars,  final LifeCycle lifeCycle) {
    final String lang = Optional.ofNullable(language).orElse("gremlin-groovy");

    logger.debug("Preparing to evaluate script - {} - in thread [{}]", script, Thread.currentThread().getName());

    final Bindings bindings = new SimpleBindings();
    bindings.putAll(globalBindings);
    bindings.putAll(boundVars);

    // override the timeout if the lifecycle has a value assigned
    final long scriptEvalTimeOut = lifeCycle.getEvaluationTimeoutOverride().orElse(evaluationTimeout);

    final CompletableFuture<Object> evaluationFuture = new CompletableFuture<>();
    final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
        try {
            lifeCycle.getBeforeEval().orElse(beforeEval).accept(bindings);

            logger.debug("Evaluating script - {} - in thread [{}]", script, Thread.currentThread().getName());

            // do this weirdo check until the now deprecated ScriptEngines is gutted
            final Object o = gremlinScriptEngineManager.getEngineByName(lang).eval(script, bindings);

            // apply a transformation before sending back the result - useful when trying to force serialization
            // in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
            // transactional constraints
            final Object result = lifeCycle.getTransformResult().isPresent() ?
                    lifeCycle.getTransformResult().get().apply(o) : o;

            // a mechanism for taking the final result and doing something with it in the same thread, but
            // AFTER the eval and transform are done and that future completed.  this provides a final means
            // for working with the result in the same thread as it was eval'd
            if (lifeCycle.getWithResult().isPresent()) lifeCycle.getWithResult().get().accept(result);

            lifeCycle.getAfterSuccess().orElse(afterSuccess).accept(bindings);

            // the evaluationFuture must be completed after all processing as an exception in lifecycle events
            // that must raise as an exception to the caller who has the returned evaluationFuture. in other words,
            // if it occurs before this point, then the handle() method won't be called again if there is an
            // exception that ends up below trying to completeExceptionally()
            evaluationFuture.complete(result);
        } catch (Throwable ex) {
            final Throwable root = null == ex.getCause() ? ex : ExceptionUtils.getRootCause(ex);

            // thread interruptions will typically come as the result of a timeout, so in those cases,
            // check for that situation and convert to TimeoutException
            if (root instanceof InterruptedException
                    || root instanceof TraversalInterruptedException
                    || root instanceof InterruptedIOException) {
                lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings);
                evaluationFuture.completeExceptionally(new TimeoutException(
                        String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, script, root.getMessage())));
            } else {
                lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
                evaluationFuture.completeExceptionally(root);
            }
        }

        return null;
    });

    final WeakReference<CompletableFuture<Object>> evaluationFutureRef = new WeakReference<>(evaluationFuture);
    final Future<?> executionFuture = executorService.submit(evalFuture);
    if (scriptEvalTimeOut > 0) {
        // Schedule a timeout in the thread pool for future execution
        final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
            if (executionFuture.cancel(true)) {
                final CompletableFuture<Object> ef = evaluationFutureRef.get();
                if (ef != null) {
                    ef.completeExceptionally(new TimeoutException(
                            String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]", scriptEvalTimeOut, script)));
                }
            }
        }, scriptEvalTimeOut, TimeUnit.MILLISECONDS);

        // Cancel the scheduled timeout if the eval future is complete or the script evaluation failed with exception
        evaluationFuture.handleAsync((v, t) -> {
            if (!sf.isDone()) {
                logger.debug("Killing scheduled timeout on script evaluation - {} - as the eval completed (possibly with exception).", script);
                sf.cancel(true);
            }

            // no return is necessary - nothing downstream is concerned with what happens in here
            return null;
        }, scheduledExecutorService);
    }

    return evaluationFuture;
}
 
源代码16 项目: j2objc   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> handle
    (CompletableFuture<T> f,
     BiFunction<? super T,Throwable,? extends U> a) {
    return f.handleAsync(a);
}
 
源代码17 项目: j2objc   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<U> handle
    (CompletableFuture<T> f,
     BiFunction<? super T,Throwable,? extends U> a) {
    return f.handleAsync(a, new ThreadExecutor());
}
 
源代码18 项目: Flink-CEPplus   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
 * input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
 * the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to call #handle.
 * @param executor the executor to run the handle function if the future is not yet done.
 * @param handler the handler function to call when the future is completed.
 * @param <IN> type of the handler input argument.
 * @param <OUT> type of the handler return value.
 * @return the new completion stage.
 */
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	BiFunction<? super IN, Throwable, ? extends OUT> handler) {
	return completableFuture.isDone() ?
		completableFuture.handle(handler) :
		completableFuture.handleAsync(handler, executor);
}
 
源代码19 项目: flink   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
 * input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
 * the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to call #handle.
 * @param executor the executor to run the handle function if the future is not yet done.
 * @param handler the handler function to call when the future is completed.
 * @param <IN> type of the handler input argument.
 * @param <OUT> type of the handler return value.
 * @return the new completion stage.
 */
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	BiFunction<? super IN, Throwable, ? extends OUT> handler) {
	return completableFuture.isDone() ?
		completableFuture.handle(handler) :
		completableFuture.handleAsync(handler, executor);
}
 
源代码20 项目: flink   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
 * input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
 * the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to call #handle.
 * @param executor the executor to run the handle function if the future is not yet done.
 * @param handler the handler function to call when the future is completed.
 * @param <IN> type of the handler input argument.
 * @param <OUT> type of the handler return value.
 * @return the new completion stage.
 */
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	BiFunction<? super IN, Throwable, ? extends OUT> handler) {
	return completableFuture.isDone() ?
		completableFuture.handle(handler) :
		completableFuture.handleAsync(handler, executor);
}