java.util.concurrent.CompletableFuture#allOf ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#allOf ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: j2objc   文件: CompletableFutureTest.java
/**
 * allOf returns a future completed normally with the value null
 * when all components complete normally
 */
public void testAllOf_normal() throws Exception {
    for (int k = 1; k < 10; k++) {
        CompletableFuture<Integer>[] fs
            = (CompletableFuture<Integer>[]) new CompletableFuture[k];
        for (int i = 0; i < k; i++)
            fs[i] = new CompletableFuture<>();
        CompletableFuture<Void> f = CompletableFuture.allOf(fs);
        for (int i = 0; i < k; i++) {
            checkIncomplete(f);
            checkIncomplete(CompletableFuture.allOf(fs));
            fs[i].complete(one);
        }
        checkCompletedNormally(f, null);
        checkCompletedNormally(CompletableFuture.allOf(fs), null);
    }
}
 
源代码2 项目: crate   文件: Session.java
private CompletableFuture<?> triggerDeferredExecutions() {
    switch (deferredExecutionsByStmt.size()) {
        case 0:
            LOGGER.debug("method=sync deferredExecutions=0");
            return CompletableFuture.completedFuture(null);
        case 1: {
            var entry = deferredExecutionsByStmt.entrySet().iterator().next();
            deferredExecutionsByStmt.clear();
            return exec(entry.getKey(), entry.getValue());
        }
        default: {
            var futures = Lists2.map(deferredExecutionsByStmt.entrySet(), x -> exec(x.getKey(), x.getValue()));
            deferredExecutionsByStmt.clear();
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        }
    }
}
 
源代码3 项目: ehcache3   文件: ClusterTierManagement.java
private void init() {
  ServerSideServerStore serverStore = ehcacheStateService.getStore(storeIdentifier);
  ServerStoreBinding serverStoreBinding = new ServerStoreBinding(storeIdentifier, serverStore);
  CompletableFuture<Void> r1 = managementRegistry.register(serverStoreBinding);
  ServerSideConfiguration.Pool pool = ehcacheStateService.getDedicatedResourcePool(storeIdentifier);
  CompletableFuture<Void> allOf;
  if (pool != null) {
    allOf = CompletableFuture.allOf(r1, managementRegistry.register(new PoolBinding(storeIdentifier, pool, PoolBinding.AllocationType.DEDICATED)));
  } else {
    allOf = r1;
  }
  allOf.thenRun(() -> {
    managementRegistry.refresh();
    managementRegistry.pushServerEntityNotification(serverStoreBinding, EHCACHE_SERVER_STORE_CREATED.name());
  });
}
 
源代码4 项目: openjdk-jdk9   文件: RandomStreamTest.java
<T> void testRandomResultSupplierConcurrently(Supplier<T> s) throws InterruptedException, ExecutionException, TimeoutException {
    // Produce 10 completable future tasks
    final int tasks = 10;
    List<CompletableFuture<T>> cfs = Stream.generate(() -> CompletableFuture.supplyAsync(s)).
            limit(tasks).collect(toList());

    // Wait for all tasks to complete
    // Timeout is beyond reasonable doubt that completion should
    // have occurred unless there is an issue
    CompletableFuture<Void> all = CompletableFuture.allOf(cfs.stream().toArray(CompletableFuture[]::new));
    all.get(1, TimeUnit.MINUTES);

    // Count the distinct results, which should equal the number of tasks
    long rc = cfs.stream().map(CompletableFuture::join).distinct().count();
    assertEquals(rc, tasks);
}
 
源代码5 项目: samza   文件: AsyncSystemProducer.java
/**
 * Default implementation of the flush that just waits for all the pendingFutures to be complete.
 * SystemProducer should override this, if the underlying system provides flush semantics.
 * @param source String representing the source of the message.
 */
@Override
public synchronized void flush(String source) {
  long incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
  LOG.info("Trying to flush pending {} sends.", incompleteSends);
  checkForSendCallbackErrors("Received exception on message send.");
  CompletableFuture<Void> future =
      CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));

  try {
    // Block until all the pending sends are complete or timeout.
    future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
    incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
    String msg = String.format("Flush failed with error. Total pending sends %d", incompleteSends);
    LOG.error(msg, e);
    throw new SamzaException(msg, e);
  }

  pendingFutures.clear();

  checkForSendCallbackErrors("Sending one or more of the messages failed during flush.");
}
 
源代码6 项目: jdk8u-jdk   文件: RandomStreamTest.java
<T> void testRandomResultSupplierConcurrently(Supplier<T> s) throws InterruptedException, ExecutionException, TimeoutException {
    // Produce 10 completable future tasks
    final int tasks = 10;
    List<CompletableFuture<T>> cfs = Stream.generate(() -> CompletableFuture.supplyAsync(s)).
            limit(tasks).collect(toList());

    // Wait for all tasks to complete
    // Timeout is beyond reasonable doubt that completion should
    // have occurred unless there is an issue
    CompletableFuture<Void> all = CompletableFuture.allOf(cfs.stream().toArray(CompletableFuture[]::new));
    all.get(1, TimeUnit.MINUTES);

    // Count the distinct results, which should equal the number of tasks
    long rc = cfs.stream().map(CompletableFuture::join).distinct().count();
    assertEquals(rc, tasks);
}
 
源代码7 项目: flink   文件: MesosResourceManager.java
private CompletableFuture<Void> stopSupportingActorsAsync() {
	FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);

	CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
	taskMonitor = null;

	CompletableFuture<Boolean> stopConnectionMonitorFuture = stopActor(connectionMonitor, stopTimeout);
	connectionMonitor = null;

	CompletableFuture<Boolean> stopLaunchCoordinatorFuture = stopActor(launchCoordinator, stopTimeout);
	launchCoordinator = null;

	CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout);
	reconciliationCoordinator = null;

	return CompletableFuture.allOf(
		stopTaskMonitorFuture,
		stopConnectionMonitorFuture,
		stopLaunchCoordinatorFuture,
		stopReconciliationCoordinatorFuture);
}
 
源代码8 项目: openjdk-jdk9   文件: CompletableFutureTest.java
/**
 * allOf returns a future completed normally with the value null
 * when all components complete normally
 */
public void testAllOf_normal() throws Exception {
    for (int k = 1; k < 10; k++) {
        CompletableFuture<Integer>[] fs
            = (CompletableFuture<Integer>[]) new CompletableFuture[k];
        for (int i = 0; i < k; i++)
            fs[i] = new CompletableFuture<>();
        CompletableFuture<Void> f = CompletableFuture.allOf(fs);
        for (int i = 0; i < k; i++) {
            checkIncomplete(f);
            checkIncomplete(CompletableFuture.allOf(fs));
            fs[i].complete(one);
        }
        checkCompletedNormally(f, null);
        checkCompletedNormally(CompletableFuture.allOf(fs), null);
    }
}
 
源代码9 项目: problematic-microservices   文件: LoadWorker.java
private void doFullRegisterOrderAndRemove() throws InterruptedException, ExecutionException {
	SpanBuilder spanBuilder = getTracer().buildSpan("fullSystemTest");
	final Span span = spanBuilder.start();
	try {
		SpanContext parentContext = span.context();

		// Register 
		CompletableFuture<Customer> newCustomer = CompletableFuture
				.supplyAsync(() -> registerRandomCustomer(parentContext));
		// Maybe not get the types and colors over and over. Looks pretty in the traces though...
		CompletableFuture<RobotType[]> availableTypes = CompletableFuture
				.supplyAsync(() -> getAllTypes(parentContext));
		CompletableFuture<Color[]> availableColors = CompletableFuture
				.supplyAsync(() -> getAllColors(parentContext));
		CompletableFuture.allOf(newCustomer, availableTypes, availableColors);

		Customer customer = newCustomer.get();

		// First completion stage done. Now we can create the order
		List<RobotOrderLineItem> lineItems = createRandomOrder(availableTypes.get(), availableColors.get());
		CompletableFuture<RobotOrder> robotOrderCompletable = CompletableFuture
				.supplyAsync(() -> postOrder(customer, lineItems, parentContext));

		// Rest will happen asynchrously when data is available...
		CompletableFuture<RealizedOrder> realizedOrderFuture = new CompletableFuture<RealizedOrder>();
		// When we have the order, we schedule the polling for an available order...
		robotOrderCompletable
				.thenAccept((order) -> awaitOrderCompletion(order, realizedOrderFuture, parentContext));
		// Once the order is realized, we will remove the customer.
		realizedOrderFuture.thenApply((realizedOrder) -> removeOwner(realizedOrder, parentContext))
				.thenAccept((customerId) -> span.finish());
	} catch (Throwable t) {
		span.log(OpenTracingUtil.getSpanLogMap(t));
		throw t;
	}
}
 
源代码10 项目: pravega   文件: Consumer.java
@Override
protected CompletableFuture<Void> run() {
    return CompletableFuture.allOf(
            processTailReads(),
            processCatchupReads(),
            processStorageReads());
}
 
源代码11 项目: flink   文件: LocalFileSystemTest.java
@Test
public void testConcurrentMkdirs() throws Exception {
	final FileSystem fs = FileSystem.getLocalFileSystem();
	final File root = temporaryFolder.getRoot();
	final int directoryDepth = 10;
	final int concurrentOperations = 10;

	final Collection<File> targetDirectories = createTargetDirectories(root, directoryDepth, concurrentOperations);

	final ExecutorService executor = Executors.newFixedThreadPool(concurrentOperations);
	final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrentOperations);

	try {
		final Collection<CompletableFuture<Void>> mkdirsFutures = new ArrayList<>(concurrentOperations);
		for (File targetDirectory : targetDirectories) {
			final CompletableFuture<Void> mkdirsFuture = CompletableFuture.runAsync(
				() -> {
					try {
						cyclicBarrier.await();
						assertThat(fs.mkdirs(Path.fromLocalFile(targetDirectory)), is(true));
					} catch (Exception e) {
						throw new CompletionException(e);
					}
				}, executor);

			mkdirsFutures.add(mkdirsFuture);
		}

		final CompletableFuture<Void> allFutures = CompletableFuture.allOf(
			mkdirsFutures.toArray(new CompletableFuture[concurrentOperations]));

		allFutures.get();
	} finally {
		final long timeout = 10000L;
		ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, executor);
	}
}
 
protected static Function<FDBRecordStore, CompletableFuture<?>> loadRecords(int start) {
    return store -> {
        final int pipelineSize = store.getPipelineSize(PipelineOperation.KEY_TO_RECORD);
        CompletableFuture<?>[] futures = new CompletableFuture<?>[pipelineSize];
        for (int i = 0; i < pipelineSize; i++) {
            futures[i] = store.loadRecordAsync(Tuple.from(start + i));
        }
        return CompletableFuture.allOf(futures);
    };
}
 
源代码13 项目: flink   文件: StreamMultipleInputProcessor.java
@Override
public CompletableFuture<Void> prepareSnapshot(
		ChannelStateWriter channelStateWriter,
		long checkpointId) throws IOException {
	CompletableFuture<?>[] inputFutures = new CompletableFuture[inputProcessors.length];
	for (int index = 0; index < inputFutures.length; index++) {
		inputFutures[index] = inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
	}
	return CompletableFuture.allOf(inputFutures);
}
 
源代码14 项目: flink   文件: TestRestartStrategy.java
public CompletableFuture<Void> triggerAll() {

		synchronized (actionsQueue) {

			if (actionsQueue.isEmpty()) {
				return CompletableFuture.completedFuture(null);
			}

			CompletableFuture<?>[] completableFutures = new CompletableFuture[actionsQueue.size()];
			for (int i = 0; i < completableFutures.length; ++i) {
				completableFutures[i] = triggerNextAction();
			}
			return CompletableFuture.allOf(completableFutures);
		}
	}
 
源代码15 项目: enode   文件: TransferTransactionProcessManager.java
@Subscribe
public void handleAsync(TransferTransactionStartedEvent evnt) {
    ValidateAccountCommand command = new ValidateAccountCommand(evnt.TransactionInfo.SourceAccountId, evnt.getAggregateRootId());
    command.setId(evnt.getId());
    ValidateAccountCommand targetCommand = new ValidateAccountCommand(evnt.TransactionInfo.TargetAccountId, evnt.getAggregateRootId());
    targetCommand.setId(evnt.getId());
    CompletableFuture task1 = commandService.sendAsync(command);
    CompletableFuture task2 = commandService.sendAsync(targetCommand);
    CompletableFuture.allOf(task1, task2);
}
 
源代码16 项目: joyrpc   文件: AbstractGroupInvoker.java
@Override
public CompletableFuture<Void> close(final boolean gracefully) {
    Map<String, ConsumerConfig<?>> configs = new HashMap<>(configMap);
    CompletableFuture<Void>[] futures = new CompletableFuture[configs.size()];
    int i = 0;
    for (ConsumerConfig<?> config : configs.values()) {
        futures[i++] = config.unrefer(gracefully);
    }
    return CompletableFuture.allOf(futures);
}
 
源代码17 项目: fdb-record-layer   文件: RankIndexMaintainer.java
@Override
protected <M extends Message> CompletableFuture<Void> updateIndexKeys(@Nonnull final FDBIndexableRecord<M> savedRecord,
                                                                      final boolean remove,
                                                                      @Nonnull final List<IndexEntry> indexEntries) {
    final int groupPrefixSize = getGroupingCount();
    final Subspace extraSubspace = getSecondarySubspace();
    final List<CompletableFuture<Void>> ordinaryIndexFutures = new ArrayList<>(indexEntries.size());
    final Map<Subspace, CompletableFuture<Void>> rankFutures = Maps.newHashMapWithExpectedSize(indexEntries.size());
    for (IndexEntry indexEntry : indexEntries) {
        // Maintain an ordinary B-tree index by score.
        CompletableFuture<Void> updateOrdinaryIndex = updateOneKeyAsync(savedRecord, remove, indexEntry);
        if (!MoreAsyncUtil.isCompletedNormally(updateOrdinaryIndex)) {
            ordinaryIndexFutures.add(updateOrdinaryIndex);
        }

        final Subspace rankSubspace;
        final Tuple scoreKey;
        if (groupPrefixSize > 0) {
            final List<Object> keyValues = indexEntry.getKey().getItems();
            rankSubspace = extraSubspace.subspace(Tuple.fromList(keyValues.subList(0, groupPrefixSize)));
            scoreKey = Tuple.fromList(keyValues.subList(groupPrefixSize, keyValues.size()));
        } else {
            rankSubspace = extraSubspace;
            scoreKey = indexEntry.getKey();
        }
        // It is unsafe to have two concurrent updates to the same ranked set, so ensure that at most
        // one update per grouping key is ongoing at any given time
        final Function<Void, CompletableFuture<Void>> futureSupplier = vignore -> RankedSetIndexHelper.updateRankedSet(
                state, rankSubspace, config, indexEntry.getKey(), scoreKey, remove
        );
        CompletableFuture<Void> existingFuture = rankFutures.get(rankSubspace);
        if (existingFuture == null) {
            rankFutures.put(rankSubspace, futureSupplier.apply(null));
        } else {
            rankFutures.put(rankSubspace, existingFuture.thenCompose(futureSupplier));
        }
    }
    return CompletableFuture.allOf(AsyncUtil.whenAll(ordinaryIndexFutures), AsyncUtil.whenAll(rankFutures.values()));
}
 
源代码18 项目: pulsar   文件: PersistentTopic.java
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
    if (log.isDebugEnabled()) {
        log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
    }
    isEncryptionRequired = data.encryption_required;

    setSchemaCompatibilityStrategy(data);
    isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;

    schemaValidationEnforced = data.schema_validation_enforced;

    maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
    maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data);

    if (data.delayed_delivery_policies != null) {
        delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
        delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
    }

    initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));

    this.updateMaxPublishRate(data);

    producers.values().forEach(producer -> {
        producer.checkPermissions();
        producer.checkEncryption();
    });
    subscriptions.forEach((subName, sub) -> {
        sub.getConsumers().forEach(Consumer::checkPermissions);
        Dispatcher dispatcher = sub.getDispatcher();
        if (dispatcher != null) {
            dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
        }
    });
    replicators.forEach((name, replicator) ->
        replicator.getRateLimiter().get().onPoliciesUpdate(data)
    );
    checkMessageExpiry();
    CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
    CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
    CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
    // update rate-limiter if policies updated
    if (this.dispatchRateLimiter.isPresent()) {
        dispatchRateLimiter.get().onPoliciesUpdate(data);
    }
    if (this.subscribeRateLimiter.isPresent()) {
        subscribeRateLimiter.get().onPoliciesUpdate(data);
    }
    getManagedLedger().getConfig().setLedgerOffloader(
            brokerService.pulsar().getManagedLedgerOffloader(
                    TopicName.get(topic).getNamespaceObject(), data.offload_policies));

    return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}
 
源代码19 项目: LuckPerms   文件: AbstractCachedDataManager.java
@Override
public @NonNull CompletableFuture<Void> reload() {
    Set<QueryOptions> keys = this.cache.asMap().keySet();
    return CompletableFuture.allOf(keys.stream().map(this::reload).toArray(CompletableFuture[]::new));
}
 
源代码20 项目: herddb   文件: KeyToPageIndexTest.java
public static final CompletableFuture<?> submitJob(ExecutorService service, KeyToPageIndex index, Bytes key, Long newPage, Long expectedPage) {

            final ConcurrentPutIfTask[] tasks = createTasks(index, key, newPage, expectedPage);

            @SuppressWarnings("unchecked") final CompletableFuture<Long>[] futures = new CompletableFuture[tasks.length];

            for (int i = 0; i < tasks.length; ++i) {
                futures[i] = CompletableFuture.supplyAsync(tasks[i]::call, service);
            }

            return CompletableFuture.allOf(futures);

        }