下面列出了java.util.concurrent.CompletableFuture#allOf ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* allOf returns a future completed normally with the value null
* when all components complete normally
*/
public void testAllOf_normal() throws Exception {
for (int k = 1; k < 10; k++) {
CompletableFuture<Integer>[] fs
= (CompletableFuture<Integer>[]) new CompletableFuture[k];
for (int i = 0; i < k; i++)
fs[i] = new CompletableFuture<>();
CompletableFuture<Void> f = CompletableFuture.allOf(fs);
for (int i = 0; i < k; i++) {
checkIncomplete(f);
checkIncomplete(CompletableFuture.allOf(fs));
fs[i].complete(one);
}
checkCompletedNormally(f, null);
checkCompletedNormally(CompletableFuture.allOf(fs), null);
}
}
private CompletableFuture<?> triggerDeferredExecutions() {
switch (deferredExecutionsByStmt.size()) {
case 0:
LOGGER.debug("method=sync deferredExecutions=0");
return CompletableFuture.completedFuture(null);
case 1: {
var entry = deferredExecutionsByStmt.entrySet().iterator().next();
deferredExecutionsByStmt.clear();
return exec(entry.getKey(), entry.getValue());
}
default: {
var futures = Lists2.map(deferredExecutionsByStmt.entrySet(), x -> exec(x.getKey(), x.getValue()));
deferredExecutionsByStmt.clear();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}
}
private void init() {
ServerSideServerStore serverStore = ehcacheStateService.getStore(storeIdentifier);
ServerStoreBinding serverStoreBinding = new ServerStoreBinding(storeIdentifier, serverStore);
CompletableFuture<Void> r1 = managementRegistry.register(serverStoreBinding);
ServerSideConfiguration.Pool pool = ehcacheStateService.getDedicatedResourcePool(storeIdentifier);
CompletableFuture<Void> allOf;
if (pool != null) {
allOf = CompletableFuture.allOf(r1, managementRegistry.register(new PoolBinding(storeIdentifier, pool, PoolBinding.AllocationType.DEDICATED)));
} else {
allOf = r1;
}
allOf.thenRun(() -> {
managementRegistry.refresh();
managementRegistry.pushServerEntityNotification(serverStoreBinding, EHCACHE_SERVER_STORE_CREATED.name());
});
}
<T> void testRandomResultSupplierConcurrently(Supplier<T> s) throws InterruptedException, ExecutionException, TimeoutException {
// Produce 10 completable future tasks
final int tasks = 10;
List<CompletableFuture<T>> cfs = Stream.generate(() -> CompletableFuture.supplyAsync(s)).
limit(tasks).collect(toList());
// Wait for all tasks to complete
// Timeout is beyond reasonable doubt that completion should
// have occurred unless there is an issue
CompletableFuture<Void> all = CompletableFuture.allOf(cfs.stream().toArray(CompletableFuture[]::new));
all.get(1, TimeUnit.MINUTES);
// Count the distinct results, which should equal the number of tasks
long rc = cfs.stream().map(CompletableFuture::join).distinct().count();
assertEquals(rc, tasks);
}
/**
* Default implementation of the flush that just waits for all the pendingFutures to be complete.
* SystemProducer should override this, if the underlying system provides flush semantics.
* @param source String representing the source of the message.
*/
@Override
public synchronized void flush(String source) {
long incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
LOG.info("Trying to flush pending {} sends.", incompleteSends);
checkForSendCallbackErrors("Received exception on message send.");
CompletableFuture<Void> future =
CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
try {
// Block until all the pending sends are complete or timeout.
future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
String msg = String.format("Flush failed with error. Total pending sends %d", incompleteSends);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
pendingFutures.clear();
checkForSendCallbackErrors("Sending one or more of the messages failed during flush.");
}
<T> void testRandomResultSupplierConcurrently(Supplier<T> s) throws InterruptedException, ExecutionException, TimeoutException {
// Produce 10 completable future tasks
final int tasks = 10;
List<CompletableFuture<T>> cfs = Stream.generate(() -> CompletableFuture.supplyAsync(s)).
limit(tasks).collect(toList());
// Wait for all tasks to complete
// Timeout is beyond reasonable doubt that completion should
// have occurred unless there is an issue
CompletableFuture<Void> all = CompletableFuture.allOf(cfs.stream().toArray(CompletableFuture[]::new));
all.get(1, TimeUnit.MINUTES);
// Count the distinct results, which should equal the number of tasks
long rc = cfs.stream().map(CompletableFuture::join).distinct().count();
assertEquals(rc, tasks);
}
private CompletableFuture<Void> stopSupportingActorsAsync() {
FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
taskMonitor = null;
CompletableFuture<Boolean> stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout);
connectionMonitor = null;
CompletableFuture<Boolean> stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout);
launchCoordinator = null;
CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
return CompletableFuture.allOf(
stopTaskMonitorFuture,
stopConnectionMonitorFuture,
stopLaunchCoordinatorFuture,
stopReconciliationCoordinatorFuture);
}
/**
* allOf returns a future completed normally with the value null
* when all components complete normally
*/
public void testAllOf_normal() throws Exception {
for (int k = 1; k < 10; k++) {
CompletableFuture<Integer>[] fs
= (CompletableFuture<Integer>[]) new CompletableFuture[k];
for (int i = 0; i < k; i++)
fs[i] = new CompletableFuture<>();
CompletableFuture<Void> f = CompletableFuture.allOf(fs);
for (int i = 0; i < k; i++) {
checkIncomplete(f);
checkIncomplete(CompletableFuture.allOf(fs));
fs[i].complete(one);
}
checkCompletedNormally(f, null);
checkCompletedNormally(CompletableFuture.allOf(fs), null);
}
}
private void doFullRegisterOrderAndRemove() throws InterruptedException, ExecutionException {
SpanBuilder spanBuilder = getTracer().buildSpan("fullSystemTest");
final Span span = spanBuilder.start();
try {
SpanContext parentContext = span.context();
// Register
CompletableFuture<Customer> newCustomer = CompletableFuture
.supplyAsync(() -> registerRandomCustomer(parentContext));
// Maybe not get the types and colors over and over. Looks pretty in the traces though...
CompletableFuture<RobotType[]> availableTypes = CompletableFuture
.supplyAsync(() -> getAllTypes(parentContext));
CompletableFuture<Color[]> availableColors = CompletableFuture
.supplyAsync(() -> getAllColors(parentContext));
CompletableFuture.allOf(newCustomer, availableTypes, availableColors);
Customer customer = newCustomer.get();
// First completion stage done. Now we can create the order
List<RobotOrderLineItem> lineItems = createRandomOrder(availableTypes.get(), availableColors.get());
CompletableFuture<RobotOrder> robotOrderCompletable = CompletableFuture
.supplyAsync(() -> postOrder(customer, lineItems, parentContext));
// Rest will happen asynchrously when data is available...
CompletableFuture<RealizedOrder> realizedOrderFuture = new CompletableFuture<RealizedOrder>();
// When we have the order, we schedule the polling for an available order...
robotOrderCompletable
.thenAccept((order) -> awaitOrderCompletion(order, realizedOrderFuture, parentContext));
// Once the order is realized, we will remove the customer.
realizedOrderFuture.thenApply((realizedOrder) -> removeOwner(realizedOrder, parentContext))
.thenAccept((customerId) -> span.finish());
} catch (Throwable t) {
span.log(OpenTracingUtil.getSpanLogMap(t));
throw t;
}
}
@Override
protected CompletableFuture<Void> run() {
return CompletableFuture.allOf(
processTailReads(),
processCatchupReads(),
processStorageReads());
}
@Test
public void testConcurrentMkdirs() throws Exception {
final FileSystem fs = FileSystem.getLocalFileSystem();
final File root = temporaryFolder.getRoot();
final int directoryDepth = 10;
final int concurrentOperations = 10;
final Collection<File> targetDirectories = createTargetDirectories(root, directoryDepth, concurrentOperations);
final ExecutorService executor = Executors.newFixedThreadPool(concurrentOperations);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrentOperations);
try {
final Collection<CompletableFuture<Void>> mkdirsFutures = new ArrayList<>(concurrentOperations);
for (File targetDirectory : targetDirectories) {
final CompletableFuture<Void> mkdirsFuture = CompletableFuture.runAsync(
() -> {
try {
cyclicBarrier.await();
assertThat(fs.mkdirs(Path.fromLocalFile(targetDirectory)), is(true));
} catch (Exception e) {
throw new CompletionException(e);
}
}, executor);
mkdirsFutures.add(mkdirsFuture);
}
final CompletableFuture<Void> allFutures = CompletableFuture.allOf(
mkdirsFutures.toArray(new CompletableFuture[concurrentOperations]));
allFutures.get();
} finally {
final long timeout = 10000L;
ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, executor);
}
}
protected static Function<FDBRecordStore, CompletableFuture<?>> loadRecords(int start) {
return store -> {
final int pipelineSize = store.getPipelineSize(PipelineOperation.KEY_TO_RECORD);
CompletableFuture<?>[] futures = new CompletableFuture<?>[pipelineSize];
for (int i = 0; i < pipelineSize; i++) {
futures[i] = store.loadRecordAsync(Tuple.from(start + i));
}
return CompletableFuture.allOf(futures);
};
}
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
CompletableFuture<?>[] inputFutures = new CompletableFuture[inputProcessors.length];
for (int index = 0; index < inputFutures.length; index++) {
inputFutures[index] = inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
}
return CompletableFuture.allOf(inputFutures);
}
public CompletableFuture<Void> triggerAll() {
synchronized (actionsQueue) {
if (actionsQueue.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<?>[] completableFutures = new CompletableFuture[actionsQueue.size()];
for (int i = 0; i < completableFutures.length; ++i) {
completableFutures[i] = triggerNextAction();
}
return CompletableFuture.allOf(completableFutures);
}
}
@Subscribe
public void handleAsync(TransferTransactionStartedEvent evnt) {
ValidateAccountCommand command = new ValidateAccountCommand(evnt.TransactionInfo.SourceAccountId, evnt.getAggregateRootId());
command.setId(evnt.getId());
ValidateAccountCommand targetCommand = new ValidateAccountCommand(evnt.TransactionInfo.TargetAccountId, evnt.getAggregateRootId());
targetCommand.setId(evnt.getId());
CompletableFuture task1 = commandService.sendAsync(command);
CompletableFuture task2 = commandService.sendAsync(targetCommand);
CompletableFuture.allOf(task1, task2);
}
@Override
public CompletableFuture<Void> close(final boolean gracefully) {
Map<String, ConsumerConfig<?>> configs = new HashMap<>(configMap);
CompletableFuture<Void>[] futures = new CompletableFuture[configs.size()];
int i = 0;
for (ConsumerConfig<?> config : configs.values()) {
futures[i++] = config.unrefer(gracefully);
}
return CompletableFuture.allOf(futures);
}
@Override
protected <M extends Message> CompletableFuture<Void> updateIndexKeys(@Nonnull final FDBIndexableRecord<M> savedRecord,
final boolean remove,
@Nonnull final List<IndexEntry> indexEntries) {
final int groupPrefixSize = getGroupingCount();
final Subspace extraSubspace = getSecondarySubspace();
final List<CompletableFuture<Void>> ordinaryIndexFutures = new ArrayList<>(indexEntries.size());
final Map<Subspace, CompletableFuture<Void>> rankFutures = Maps.newHashMapWithExpectedSize(indexEntries.size());
for (IndexEntry indexEntry : indexEntries) {
// Maintain an ordinary B-tree index by score.
CompletableFuture<Void> updateOrdinaryIndex = updateOneKeyAsync(savedRecord, remove, indexEntry);
if (!MoreAsyncUtil.isCompletedNormally(updateOrdinaryIndex)) {
ordinaryIndexFutures.add(updateOrdinaryIndex);
}
final Subspace rankSubspace;
final Tuple scoreKey;
if (groupPrefixSize > 0) {
final List<Object> keyValues = indexEntry.getKey().getItems();
rankSubspace = extraSubspace.subspace(Tuple.fromList(keyValues.subList(0, groupPrefixSize)));
scoreKey = Tuple.fromList(keyValues.subList(groupPrefixSize, keyValues.size()));
} else {
rankSubspace = extraSubspace;
scoreKey = indexEntry.getKey();
}
// It is unsafe to have two concurrent updates to the same ranked set, so ensure that at most
// one update per grouping key is ongoing at any given time
final Function<Void, CompletableFuture<Void>> futureSupplier = vignore -> RankedSetIndexHelper.updateRankedSet(
state, rankSubspace, config, indexEntry.getKey(), scoreKey, remove
);
CompletableFuture<Void> existingFuture = rankFutures.get(rankSubspace);
if (existingFuture == null) {
rankFutures.put(rankSubspace, futureSupplier.apply(null));
} else {
rankFutures.put(rankSubspace, existingFuture.thenCompose(futureSupplier));
}
}
return CompletableFuture.allOf(AsyncUtil.whenAll(ordinaryIndexFutures), AsyncUtil.whenAll(rankFutures.values()));
}
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;
maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data);
if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
producers.values().forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
replicators.forEach((name, replicator) ->
replicator.getRateLimiter().get().onPoliciesUpdate(data)
);
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
if (this.dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
}
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
}
getManagedLedger().getConfig().setLedgerOffloader(
brokerService.pulsar().getManagedLedgerOffloader(
TopicName.get(topic).getNamespaceObject(), data.offload_policies));
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}
@Override
public @NonNull CompletableFuture<Void> reload() {
Set<QueryOptions> keys = this.cache.asMap().keySet();
return CompletableFuture.allOf(keys.stream().map(this::reload).toArray(CompletableFuture[]::new));
}
public static final CompletableFuture<?> submitJob(ExecutorService service, KeyToPageIndex index, Bytes key, Long newPage, Long expectedPage) {
final ConcurrentPutIfTask[] tasks = createTasks(index, key, newPage, expectedPage);
@SuppressWarnings("unchecked") final CompletableFuture<Long>[] futures = new CompletableFuture[tasks.length];
for (int i = 0; i < tasks.length; ++i) {
futures[i] = CompletableFuture.supplyAsync(tasks[i]::call, service);
}
return CompletableFuture.allOf(futures);
}