下面列出了java.util.concurrent.CompletableFuture#thenRun ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Replaces existing milestone with the newMilestone in the issue object, the UI, and the server, in that order.
* Server update is done after the local update to reduce the lag between the user action and the UI response
*
* @param issue The issue object whose milestone is to be replaced
* @param newMilestone The new milestone to be assigned to the issue
* @return true if milestone replacements locally and on GitHub were successful
*/
public CompletableFuture<Boolean> replaceIssueMilestone(TurboIssue issue, Optional<Integer> newMilestone) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
logger.info("Changing milestone for " + issue + " in models");
Optional<Integer> oldMilestone = issue.getMilestone();
CompletableFuture<Optional<TurboIssue>> localMilestoneReplaceFuture =
repoOpControl.replaceIssueMilestoneLocally(issue, newMilestone);
localMilestoneReplaceFuture.thenRun(this::refreshUI);
updateIssueMilestonesOnServer(issue, newMilestone)
.exceptionally((e) -> {
result.completeExceptionally(e);
return false;
})
.thenCombine(localMilestoneReplaceFuture, (isUpdateSuccessful, locallyModifiedIssue) ->
handleIssueMilestoneUpdateOnServerResult(
isUpdateSuccessful, locallyModifiedIssue, oldMilestone))
.thenAccept(result::complete);
return result;
}
/**
* Replaces existing assignee with new assignee in the issue object, the UI, and the server, in that order.
* Server update is done after the local update to reduce the lag between the user action and the UI response
*
* @param issue
* @param newAssigneeLoginName
* @return
*/
public CompletableFuture<Boolean> replaceIssueAssignee(TurboIssue issue, Optional<String> newAssigneeLoginName){
logger.info("Changing assignee for " + issue + " on UI");
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture<Optional<TurboIssue>> localAssigneeReplaceFuture =
repoOpControl.replaceIssueAssigneeLocally(issue, newAssigneeLoginName);
localAssigneeReplaceFuture.thenRun(this::refreshUI);
updateIssueAssigneesOnServer(issue, newAssigneeLoginName)
.exceptionally((e) -> {
result.completeExceptionally(e);
return false;
})
.thenCombine(localAssigneeReplaceFuture, this::handleIssueAssigneeUpdateResult)
.thenAccept(result::complete);
return result;
}
/**
* Replaces existing labels with new labels in the issue object, the UI, and the server, in that order.
* Server update is done after the local update to reduce the lag between the user action and the UI response
*
* @param issue The issue object whose labels are to be replaced.
* @param newLabels The list of new labels to be assigned to the issue.
* @return true if label replacement on GitHub was a success, false otherwise.
*/
public CompletableFuture<Boolean> replaceIssueLabels(TurboIssue issue, List<String> newLabels) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
List<String> originalLabels = issue.getLabels();
logger.info("Changing labels for " + issue + " on UI");
CompletableFuture<Optional<TurboIssue>> localLabelsReplaceFuture =
repoOpControl.replaceIssueLabelsLocally(issue, newLabels);
localLabelsReplaceFuture.thenRun(this::refreshUI);
updateIssueLabelsOnServer(issue, newLabels)
.exceptionally((e) -> {
result.completeExceptionally(e);
return false;
})
.thenCombine(localLabelsReplaceFuture, (isUpdateSuccessful, locallyModifiedIssue) ->
handleIssueLabelsUpdateResult(isUpdateSuccessful, locallyModifiedIssue, originalLabels))
.thenAccept(result::complete);
return result;
}
private void resetAndStartScheduler() throws Exception {
validateRunsInMainThread();
final CompletableFuture<Void> schedulerAssignedFuture;
if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
schedulerAssignedFuture = CompletableFuture.completedFuture(null);
schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
} else {
suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);
schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
(ignored, throwable) -> {
newScheduler.setMainThreadExecutor(getMainThreadExecutor());
assignScheduler(newScheduler, newJobManagerJobMetricGroup);
return null;
}
);
}
schedulerAssignedFuture.thenRun(this::startScheduling);
}
@Override
public void onTaskFailure(Execution taskExecution, Throwable cause) {
executionGraph.getJobMasterMainThreadExecutor().assertRunningInMainThread();
// to better handle the lack of resources (potentially by a scale-in), we
// make failures due to missing resources global failures
if (cause instanceof NoResourceAvailableException) {
LOG.info("Not enough resources to schedule {} - triggering full recovery.", taskExecution);
executionGraph.failGlobal(cause);
return;
}
LOG.info("Recovering task failure for {} (#{}) via individual restart.",
taskExecution.getVertex().getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber());
numTaskFailures.inc();
// trigger the restart once the task has reached its terminal state
// Note: currently all tasks passed here are already in their terminal state,
// so we could actually avoid the future. We use it anyways because it is cheap and
// it helps to support better testing
final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminalStateFuture();
terminationFuture.thenRun(
() -> performExecutionVertexRestart(taskExecution.getVertex(), taskExecution.getGlobalModVersion()));
}
/**
* Tests ordered failure of future callbacks.
*/
public void testOrderedFailure() throws Throwable {
CompletableFuture<String> future = new OrderedCompletableFuture<>();
AtomicInteger order = new AtomicInteger();
future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
future.handle((r, e) -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.thenRun(() -> fail());
future.thenAccept(r -> fail());
future.exceptionally(e -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.completeExceptionally(new RuntimeException("foo"));
}
@Override
public CompletableFuture<Void> truncate(SegmentHandle handle, long offset, Duration timeout) {
TruncateInterceptor ti = this.truncateInterceptor;
CompletableFuture<Void> result;
if (ti != null) {
result = ti.apply(handle.getSegmentName(), offset, this.wrappedStorage);
} else {
result = CompletableFuture.completedFuture(null);
}
return result.thenRun(() -> truncateDirectly(handle, offset));
}
@Test
public void whenAddingThenRunToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenRun(() -> LOG.debug("Computation finished."));
future.get();
}
public UnionInputGate(InputGate... inputGates) {
this.inputGates = checkNotNull(inputGates);
checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGates.length);
this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length);
int currentNumberOfInputChannels = 0;
synchronized (inputGatesWithData) {
for (InputGate inputGate : inputGates) {
if (inputGate instanceof UnionInputGate) {
// if we want to add support for this, we need to implement pollNext()
throw new UnsupportedOperationException("Cannot union a union of input gates.");
}
// The offset to use for buffer or event instances received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels);
inputGatesWithRemainingData.add(inputGate);
currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
CompletableFuture<?> available = inputGate.isAvailable();
if (available.isDone()) {
inputGatesWithData.add(inputGate);
} else {
available.thenRun(() -> queueInputGate(inputGate));
}
}
if (!inputGatesWithData.isEmpty()) {
isAvailable = AVAILABLE;
}
}
this.totalNumberOfInputChannels = currentNumberOfInputChannels;
}
/**
* Tests ordered completion of future callbacks.
*/
@Test
public void testOrderedCompletion() throws Throwable {
CompletableFuture<String> future = new OrderedFuture<>();
AtomicInteger order = new AtomicInteger();
future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
future.handle((r, e) -> {
assertEquals(3, order.incrementAndGet());
assertEquals("foo", r);
return "bar";
});
future.thenRun(() -> assertEquals(3, order.incrementAndGet()));
future.thenAccept(r -> {
assertEquals(5, order.incrementAndGet());
assertEquals("foo", r);
});
future.thenApply(r -> {
assertEquals(6, order.incrementAndGet());
assertEquals("foo", r);
return "bar";
});
future.whenComplete((r, e) -> {
assertEquals(7, order.incrementAndGet());
assertEquals("foo", r);
});
future.complete("foo");
}
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
topicFuture.completeExceptionally(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication();
replicationFuture.thenRun(() -> {
log.info("Created topic {}", nonPersistentTopic);
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
topicFuture.complete(Optional.of(nonPersistentTopic));
});
replicationFuture.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
});
return null;
});
return topicFuture;
}
@Benchmark
public Void ensurePromiseN() throws InterruptedException, ExecutionException {
CompletableFuture<Void> p = new CompletableFuture<>();
CompletableFuture<Void> f = p;
for (int i = 0; i < N.n; i++)
f = f.thenRun(ensureF);
p.complete(null);
return f.get();
}
private CompletableFuture<List<T>> takeAsync0(int amount, BiFunction<CompletableFuture<?>, List<T>, CompletableFuture<?>> converter)
{
CompletableFuture<List<T>> task = new CompletableFuture<>();
List<T> list = new ArrayList<>(amount);
CompletableFuture<?> promise = converter.apply(task, list);
promise.thenRun(() -> task.complete(list));
return task;
}
/**
* This method implements the default action of the task (e.g. processing one event from the input). Implementations
* should (in general) be non-blocking.
*
* @param controller controller object for collaborative interaction between the action and the stream task.
* @throws Exception on any problems in the action.
*/
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
InputStatus status = inputProcessor.processInput();
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
jointFuture.thenRun(suspendedDefaultAction::resume);
}
Callback(long id, String type, Duration timeout, CompletableFuture<byte[]> future) {
this.id = id;
this.type = type;
this.timeout = getTimeoutMillis(type, timeout);
this.scheduledFuture = executorService.schedule(this::timeout, this.timeout, TimeUnit.MILLISECONDS);
this.replyFuture = future;
future.thenRun(() -> addReplyTime(type, System.currentTimeMillis() - time));
callbacks.put(id, this);
}
@Override
public void start() {
_multiClusterTopicManagementService.start();
CompletableFuture<Void> topicPartitionResult = _multiClusterTopicManagementService.topicPartitionResult();
topicPartitionResult.thenRun(() -> {
_produceService.start();
_consumeService.start();
});
LOG.info(_name + "/MultiClusterMonitor started.");
}
@Benchmark
public Void ensurePromise() throws InterruptedException, ExecutionException {
CompletableFuture<Void> p = new CompletableFuture<Void>();
CompletableFuture<Void> f = p.thenRun(ensureF);
p.complete(null);
return f.get();
}
/**
* Stops this REST server endpoint.
*
* @return Future which is completed once the shut down has been finished.
*/
protected CompletableFuture<Void> shutDownInternal() {
synchronized (lock) {
CompletableFuture<?> channelFuture = new CompletableFuture<>();
if (serverChannel != null) {
serverChannel.close().addListener(finished -> {
if (finished.isSuccess()) {
channelFuture.complete(null);
} else {
channelFuture.completeExceptionally(finished.cause());
}
});
serverChannel = null;
}
final CompletableFuture<Void> channelTerminationFuture = new CompletableFuture<>();
channelFuture.thenRun(() -> {
CompletableFuture<?> groupFuture = new CompletableFuture<>();
CompletableFuture<?> childGroupFuture = new CompletableFuture<>();
final Time gracePeriod = Time.seconds(10L);
if (bootstrap != null) {
final ServerBootstrapConfig config = bootstrap.config();
final EventLoopGroup group = config.group();
if (group != null) {
group.shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
groupFuture.complete(null);
} else {
groupFuture.completeExceptionally(finished.cause());
}
});
} else {
groupFuture.complete(null);
}
final EventLoopGroup childGroup = config.childGroup();
if (childGroup != null) {
childGroup.shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
childGroupFuture.complete(null);
} else {
childGroupFuture.completeExceptionally(finished.cause());
}
});
} else {
childGroupFuture.complete(null);
}
bootstrap = null;
} else {
// complete the group futures since there is nothing to stop
groupFuture.complete(null);
childGroupFuture.complete(null);
}
CompletableFuture<Void> combinedFuture = FutureUtils.completeAll(Arrays.asList(groupFuture, childGroupFuture));
combinedFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
channelTerminationFuture.completeExceptionally(throwable);
} else {
channelTerminationFuture.complete(null);
}
});
});
return channelTerminationFuture;
}
}
/**
* When an already-contextualized Runnable is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualRunnableOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-1"));
Label.set("contextualRunnableOverride-label-1");
Runnable precontextualizedTask1 = labelContext.contextualRunnable(() -> {
Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-2"));
Label.set("contextualRunnableOverride-label-2");
Runnable precontextualizedTask2 = labelContext.contextualRunnable(() -> {
Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-3"));
Label.set("contextualRunnableOverride-label-3");
Runnable normalTask = () -> {
Assert.assertEquals(Buffer.get().toString(), "contextualRunnableOverride-buffer-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
};
Future<Integer> future = executor.submit(precontextualizedTask1, 1);
Assert.assertEquals(future.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS), Integer.valueOf(1),
"Unexpected result of task.");
CompletableFuture<Void> stage0 = executor.runAsync(precontextualizedTask1);
CompletableFuture<Void> stage1 = stage0.thenRunAsync(precontextualizedTask1);
CompletableFuture<Void> stage2 = stage0.thenRun(precontextualizedTask2);
CompletableFuture<Void> stage3 = stage1.runAfterEither(stage2, precontextualizedTask2);
CompletableFuture<Void> stage4 = stage1.runAfterBothAsync(stage2, precontextualizedTask1);
CompletableFuture<Void> stage5 = stage4.runAfterBoth(stage3, normalTask);
stage5.join();
LinkedBlockingQueue<String> results = new LinkedBlockingQueue<String>();
Runnable precontextualizedTask3 = labelContext.contextualRunnable(() -> results.add(Label.get()));
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-4"));
Label.set("contextualRunnableOverride-label-4");
executor.execute(precontextualizedTask3);
Assert.assertEquals(results.poll(MAX_WAIT_NS, TimeUnit.NANOSECONDS), "contextualRunnableOverride-label-3",
"Previously captured context type not found on thread.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
/**
* Verify that the cleared and propagated attributes of a ManagedExecutor are defaulted
* according to the defaults specified by the application in MicroProfile Config.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void defaultContextPropagationForManagedExecutorViaMPConfig()
throws ExecutionException, InterruptedException, TimeoutException {
// Expected config is maxAsync=1, maxQueued=4; propagated=Label,ThreadPriority; cleared=Remaining
ManagedExecutor executor = ManagedExecutor.builder().build();
int originalPriority = Thread.currentThread().getPriority();
try {
// Set non-default values
int newPriority = originalPriority == 4 ? 3 : 4;
Thread.currentThread().setPriority(newPriority);
Buffer.set(new StringBuffer("defaultContextPropagationForManagedExecutorViaMPConfig-test-buffer"));
Label.set("defaultContextPropagationForManagedExecutorViaMPConfig-test-label");
// Run on separate thread to test propagated
CompletableFuture<Void> stage1 = executor.completedFuture(newPriority)
.thenAcceptAsync(expectedPriority -> {
Assert.assertEquals(Label.get(), "defaultContextPropagationForManagedExecutorViaMPConfig-test-label",
"Context type (Label) that MicroProfile config defaults to be propagated was not correctly propagated.");
Assert.assertEquals(Integer.valueOf(Thread.currentThread().getPriority()), expectedPriority,
"Context type (ThreadPriority) that MicroProfile config defaults to be propagated was not correctly propagated.");
});
Assert.assertNull(stage1.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS),
"Non-null value returned by stage that runs async Consumer.");
// Run on current thread to test cleared
CompletableFuture<Void> stage2 = stage1.thenRun(() ->
Assert.assertEquals(Buffer.get().toString(), "",
"Context type (Buffer) that MicroProfile config overrides to be cleared was not cleared.")
);
Assert.assertNull(stage2.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS),
"Non-null value returned by stage that runs Runnable.");
}
finally {
// Restore original values
Buffer.set(null);
Label.set(null);
Thread.currentThread().setPriority(originalPriority);
}
}