java.util.concurrent.CompletableFuture#thenCompose ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#thenCompose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: centraldogma   文件: MigrationUtilTest.java
@Override
public <T> CompletableFuture<T> execute(Command<T> command) {
    if (!(command instanceof CreateProjectCommand)) {
        return super.execute(command);
    }

    final CreateProjectCommand c = (CreateProjectCommand) command;
    final String projectName = c.projectName();
    final long creationTimeMillis = c.timestamp();
    final Author author = c.author();

    // Do not generate sample files because they are not necessary for the migration test.
    CompletableFuture<?> f = delegate().execute(c);
    f = f.thenCompose(unused -> delegate().execute(
            Command.push(creationTimeMillis, author, projectName, REPO_DOGMA, Revision.HEAD,
                         "Delete /metadata.json to mimic the legacy project",
                         "", Markup.PLAINTEXT, Change.ofRemoval(METADATA_JSON))));
    f = f.exceptionally(unused -> null);
    return f.thenCompose(unused -> delegate().execute(
            Command.createRepository(creationTimeMillis, author, projectName, REPO_FOO)))
            .thenApply(unused -> null);
}
 
源代码2 项目: NioImapClient   文件: ImapClientFactory.java
private CompletableFuture<ImapClient> handleProxyConnect(ImapClientConfiguration clientConfiguration,
                                                         CompletableFuture<ImapClient> connectFuture) {
  if (!clientConfiguration.proxyConfig().isPresent()) {
    return connectFuture;
  }
  ProxyCommand proxyCommand = new ProxyCommand(
      clientConfiguration.hostAndPort(),
      clientConfiguration.proxyConfig().get().proxyLocalIpAddress()
  );
  return connectFuture.thenCompose(imapClient ->
      imapClient.send(proxyCommand)
          .thenApply(ignored -> {
            if (clientConfiguration.useSsl()) {
              imapClient.addTlsToChannel();
            }
            return imapClient;
          })
  );
}
 
源代码3 项目: smarthome   文件: HomeAssistantThingHandler.java
/**
 * Start a background discovery for the configured HA MQTT object-id.
 */
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
    connection.setRetain(true);
    connection.setQos(1);

    updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "No response from the device yet");

    // Start all known components and channels within the components and put the Thing offline
    // if any subscribing failed ( == broker connection lost)
    CompletableFuture<@Nullable Void> future = haComponents.values().stream()
            .map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
            .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to one
            .exceptionally(e -> {
                updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
                return null;
            });

    return future
            .thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantID, this));
}
 
源代码4 项目: ditto   文件: DittoPublicKeyProvider.java
private CompletableFuture<PublicKey> loadPublicKey(final PublicKeyIdWithIssuer publicKeyIdWithIssuer,
        final Executor executor) {

    final String issuer = publicKeyIdWithIssuer.getIssuer();
    final String keyId = publicKeyIdWithIssuer.getKeyId();
    LOGGER.debug("Loading public key with id <{}> from issuer <{}>.", keyId, issuer);

    final JwtSubjectIssuerConfig subjectIssuerConfig =
            jwtSubjectIssuersConfig.getConfigItem(issuer)
                    .orElseThrow(() -> GatewayJwtIssuerNotSupportedException.newBuilder(issuer).build());

    final String discoveryEndpoint = getDiscoveryEndpoint(subjectIssuerConfig.getIssuer());
    final CompletableFuture<HttpResponse> responseFuture =
            CompletableFuture.supplyAsync(() -> getPublicKeysFromDiscoveryEndpoint(discoveryEndpoint));
    final CompletableFuture<JsonArray> publicKeysFuture =
            responseFuture.thenCompose(this::mapResponseToJsonArray);
    return publicKeysFuture.thenApply(publicKeysArray -> mapToPublicKey(publicKeysArray, keyId, discoveryEndpoint))
            .toCompletableFuture();
}
 
private CompletableFuture<Tuple> evaluateEqualRange(@Nonnull TupleRange range,
                                                    @Nonnull EvaluateEqualRange function) {
    final Tuple tuple = range.getLow();
    final int type = (int) tuple.getLong(0);
    final long timestamp = tuple.getLong(1);
    final int groupingCount = getGroupingCount();
    final Tuple groupKey = TupleHelpers.subTuple(tuple, 2, 2 + groupingCount);
    final Tuple values = TupleHelpers.subTuple(tuple, 2 + groupingCount, tuple.size());
    final CompletableFuture<TimeWindowLeaderboard> leaderboardFuture = oldestLeaderboardMatching(type, timestamp);
    return leaderboardFuture.thenCompose(leaderboard -> {
        if (leaderboard == null) {
            return CompletableFuture.completedFuture(null);
        }
        final Tuple leaderboardGroupKey = leaderboard.getSubspaceKey().addAll(groupKey);
        final Subspace extraSubspace = getSecondarySubspace();
        final Subspace rankSubspace = extraSubspace.subspace(leaderboardGroupKey);
        final RankedSet.Config leaderboardConfig = config.toBuilder().setNLevels(leaderboard.getNLevels()).build();
        final RankedSet rankedSet = new RankedSetIndexHelper.InstrumentedRankedSet(state, rankSubspace, leaderboardConfig);
        return function.apply(leaderboard, rankedSet, groupKey, values);
    });
}
 
源代码6 项目: flink   文件: ApplicationDispatcherBootstrap.java
/**
 * Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
 * The returned {@link CompletableFuture} completes when all jobs of the user application
 * succeeded. if any of them fails, or if job submission fails.
 */
private CompletableFuture<Void> runApplicationAsync(
		final DispatcherGateway dispatcher,
		final ScheduledExecutor scheduledExecutor,
		final boolean enforceSingleJobExecution) {
	final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();

	// we need to hand in a future as return value because we need to get those JobIs out
	// from the scheduled task that executes the user program
	applicationExecutionTask = scheduledExecutor.schedule(
			() -> runApplicationEntryPoint(
					applicationExecutionFuture,
					dispatcher,
					scheduledExecutor,
					enforceSingleJobExecution),
			0L,
			TimeUnit.MILLISECONDS);

	return applicationExecutionFuture.thenCompose(
			jobIds -> getApplicationResult(dispatcher, jobIds, scheduledExecutor));
}
 
源代码7 项目: pravega   文件: CommitRequestHandler.java
/**
 * This method loops over each transaction in the list, and commits them in order
 * At the end of this method's execution, all transactions in the list would have committed into given list of segments.
 */
private CompletableFuture<Void> commitTransactions(String scope, String stream, List<Long> segments,
                                                   List<UUID> transactionsToCommit, OperationContext context, Timer timer) {
    // Chain all transaction commit futures one after the other. This will ensure that order of commit
    // if honoured and is based on the order in the list.
    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
    for (UUID txnId : transactionsToCommit) {
        log.info("Committing transaction {} on stream {}/{}", txnId, scope, stream);
        // commit transaction in segment store
        future = future
                // Note, we can use the same segments and transaction id as only
                // primary id is taken for creation of txn-segment name and secondary part is erased and replaced with
                // transaction's epoch.
                // And we are creating duplicates of txn epoch keeping the primary same.
                // After committing transactions, we collect the current sizes of segments and update the offset 
                // at which the transaction was committed into ActiveTxnRecord in an idempotent fashion. 
                // Note: if its a rerun, transaction commit offsets may have been updated already in previous iteration
                // so this will not update/modify it. 
                .thenCompose(v -> streamMetadataTasks.notifyTxnCommit(scope, stream, segments, txnId))
                .thenCompose(v -> streamMetadataTasks.getCurrentSegmentSizes(scope, stream, segments))
                .thenCompose(map -> streamMetadataStore.recordCommitOffsets(scope, stream, txnId, map, context, executor))
                .thenRun(() -> TransactionMetrics.getInstance().commitTransaction(scope, stream, timer.getElapsed()));
    }
    
    return future
            .thenCompose(v -> bucketStore.addStreamToBucketStore(BucketStore.ServiceType.WatermarkingService, scope, stream, executor));
}
 
源代码8 项目: distributedlog   文件: BKAsyncLogWriter.java
@Override
protected CompletableFuture<Void> asyncCloseAndComplete() {
    CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
    synchronized (this) {
        logSegmentWriterFuture = this.rollingFuture;
    }

    if (null == logSegmentWriterFuture) {
        return super.asyncCloseAndComplete();
    } else {
        return logSegmentWriterFuture.thenCompose(segmentWriter1 -> super.asyncCloseAndComplete());
    }
}
 
源代码9 项目: jdk8u60   文件: ThenComposeAsyncTest.java
public void testThenComposeAsync() throws Exception {
    // Composing CompletableFuture is complete
    CompletableFuture<String> cf1 = CompletableFuture.completedFuture("one");

    // Composing function returns a CompletableFuture executed asynchronously
    CountDownLatch cdl = new CountDownLatch(1);
    CompletableFuture<String> cf2 = cf1.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
        while (true) {
            try {
                cdl.await();
                break;
            }
            catch (InterruptedException e) {
            }
        }
        return str + ", two";
    }));

    // Ensure returned CompletableFuture completes after call to thenCompose
    // This guarantees that any premature internal completion will be
    // detected
    cdl.countDown();

    String val = cf2.get();
    Assert.assertNotNull(val);
    Assert.assertEquals(val, "one, two");
}
 
源代码10 项目: openjdk-jdk8u   文件: ThenComposeAsyncTest.java
public void testThenComposeAsync() throws Exception {
    // Composing CompletableFuture is complete
    CompletableFuture<String> cf1 = CompletableFuture.completedFuture("one");

    // Composing function returns a CompletableFuture executed asynchronously
    CountDownLatch cdl = new CountDownLatch(1);
    CompletableFuture<String> cf2 = cf1.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
        while (true) {
            try {
                cdl.await();
                break;
            }
            catch (InterruptedException e) {
            }
        }
        return str + ", two";
    }));

    // Ensure returned CompletableFuture completes after call to thenCompose
    // This guarantees that any premature internal completion will be
    // detected
    cdl.countDown();

    String val = cf2.get();
    Assert.assertNotNull(val);
    Assert.assertEquals(val, "one, two");
}
 
源代码11 项目: ua-server-sdk   文件: BrowsePathsHelper.java
private CompletableFuture<ExpandedNodeId> next(NodeId nodeId, RelativePathElement element) {
    NodeId referenceTypeId = element.getReferenceTypeId();
    boolean includeSubtypes = element.getIncludeSubtypes();
    QualifiedName targetName = element.getTargetName();

    Namespace namespace = namespaceManager.getNamespace(nodeId.getNamespaceIndex());

    CompletableFuture<List<Reference>> future = namespace.getReferences(nodeId);

    return future.thenCompose(references -> {
        List<ExpandedNodeId> targetNodeIds = references.stream()
                /* Filter for references of the requested type or its subtype, if allowed... */
                .filter(r -> referenceTypeId.isNull() ||
                        r.getReferenceTypeId().equals(referenceTypeId) ||
                        (includeSubtypes && r.subtypeOf(referenceTypeId, server.getReferenceTypes())))

                /* Filter for reference direction... */
                .filter(r -> r.isInverse() == element.getIsInverse())

                /* Map to target ExpandedNodeId... */
                .map(Reference::getTargetNodeId)
                .collect(toList());

        return readTargetBrowseNames(targetNodeIds).thenApply(browseNames -> {
            for (int i = 0; i < targetNodeIds.size(); i++) {
                ExpandedNodeId targetNodeId = targetNodeIds.get(i);
                QualifiedName browseName = browseNames.get(i);
                if (browseName.equals(targetName)) {
                    return targetNodeId;
                }
            }

            return ExpandedNodeId.NULL_VALUE;
        });
    });
}
 
源代码12 项目: flink   文件: RestClusterClient.java
@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
	final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath);

	final CompletableFuture<TriggerResponse> savepointDisposalTriggerFuture = sendRequest(
		SavepointDisposalTriggerHeaders.getInstance(),
		savepointDisposalRequest);

	final CompletableFuture<AsynchronousOperationInfo> savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose(
		(TriggerResponse triggerResponse) -> {
			final TriggerId triggerId = triggerResponse.getTriggerId();
			final SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
			final SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
			savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);

			return pollResourceAsync(
				() -> sendRequest(
					savepointDisposalStatusHeaders,
					savepointDisposalStatusMessageParameters));
		});

	return savepointDisposalFuture.thenApply(
		(AsynchronousOperationInfo asynchronousOperationInfo) -> {
			if (asynchronousOperationInfo.getFailureCause() == null) {
				return Acknowledge.get();
			} else {
				throw new CompletionException(asynchronousOperationInfo.getFailureCause());
			}
		});
}
 
源代码13 项目: pravega   文件: StreamTransactionMetadataTasks.java
/**
 * Creates txn on the specified stream.
 *
 * Post-condition:
 * 1. If txn creation succeeds, then
 *     (a) txn node is created in the store,
 *     (b) txn segments are successfully created on respective segment stores,
 *     (c) txn is present in the host-txn index of current host,
 *     (d) txn's timeout is being tracked in timeout service.
 *
 * 2. If process fails after creating txn node, but before responding to the client, then since txn is
 * present in the host-txn index, some other controller process shall abort the txn after maxLeaseValue
 *
 * 3. If timeout service tracks timeout of specified txn,
 * then txn is also present in the host-txn index of current process.
 *
 * Invariant:
 * The following invariants are maintained throughout the execution of createTxn, pingTxn and sealTxn methods.
 * 1. If timeout service tracks timeout of a txn, then txn is also present in the host-txn index of current process.
 * 2. If txn znode is updated, then txn is also present in the host-txn index of current process.
 *
 * @param scope               scope name.
 * @param stream              stream name.
 * @param lease               txn lease.
 * @param ctx                 context.
 * @return                    identifier of the created txn.
 */
CompletableFuture<Pair<VersionedTransactionData, List<StreamSegmentRecord>>> createTxnBody(final String scope,
                                                                               final String stream,
                                                                               final long lease,
                                                                               final OperationContext ctx) {
    // Step 1. Validate parameters.
    CompletableFuture<Void> validate = validate(lease);
    long maxExecutionPeriod = Math.min(MAX_EXECUTION_TIME_MULTIPLIER * lease, Duration.ofDays(1).toMillis());

    // 1. get latest epoch from history
    // 2. generateNewTransactionId.. this step can throw WriteConflictException
    // 3. txn id = 32 bit epoch + 96 bit counter
    // 4. if while creating txn epoch no longer exists, then we will get DataNotFoundException.
    // 5. Retry if we get WriteConflict or DataNotFoundException, from step 1.
    // Note: this is a low probability for either exceptions:
    // - WriteConflict, if multiple controllers are trying to get new range at the same time then we can get write conflict
    // - DataNotFoundException because it will only happen in rare case
    // when we generate the transactionid against latest epoch (if there is ongoing scale then this is new epoch)
    // and then epoch node is deleted as scale starts and completes.
    return validate.thenCompose(validated -> RetryHelper.withRetriesAsync(() ->
            streamMetadataStore.generateTransactionId(scope, stream, ctx, executor)
            .thenCompose(txnId -> {
                CompletableFuture<Void> addIndex = addTxnToIndex(scope, stream, txnId);

                // Step 3. Create txn node in the store.
                CompletableFuture<VersionedTransactionData> txnFuture = createTxnInStore(scope, stream, lease,
                        ctx, maxExecutionPeriod, txnId, addIndex);

                // Step 4. Notify segment stores about new txn.
                CompletableFuture<List<StreamSegmentRecord>> segmentsFuture = txnFuture.thenComposeAsync(txnData ->
                        streamMetadataStore.getSegmentsInEpoch(scope, stream, txnData.getEpoch(), ctx, executor), executor);

                CompletableFuture<Void> notify = segmentsFuture.thenComposeAsync(activeSegments ->
                        notifyTxnCreation(scope, stream, activeSegments, txnId), executor).whenComplete((v, e) ->
                        // Method notifyTxnCreation ensures that notification completes
                        // even in the presence of n/w or segment store failures.
                        log.trace("Txn={}, notified segments stores", txnId));

                // Step 5. Start tracking txn in timeout service
                return notify.whenCompleteAsync((result, ex) -> {
                    addTxnToTimeoutService(scope, stream, lease, maxExecutionPeriod, txnId, txnFuture);
                }, executor).thenApplyAsync(v -> {
                    List<StreamSegmentRecord> segments = segmentsFuture.join().stream().map(x -> {
                        long generalizedSegmentId = RecordHelper.generalizedSegmentId(x.segmentId(), txnId);
                        int epoch = NameUtils.getEpoch(generalizedSegmentId);
                        int segmentNumber = NameUtils.getSegmentNumber(generalizedSegmentId);
                        return StreamSegmentRecord.builder().creationEpoch(epoch).segmentNumber(segmentNumber)
                                .creationTime(x.getCreationTime()).keyStart(x.getKeyStart()).keyEnd(x.getKeyEnd()).build();
                    }).collect(Collectors.toList());

                    return new ImmutablePair<>(txnFuture.join(), segments);
                }, executor);
            }), e -> {
        Throwable unwrap = Exceptions.unwrap(e);
        return unwrap instanceof StoreException.WriteConflictException || unwrap instanceof StoreException.DataNotFoundException;
    }, 5, executor));
}
 
源代码14 项目: Flink-CEPplus   文件: Dispatcher.java
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
	final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

	return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
}
 
源代码15 项目: flink   文件: LegacyScheduler.java
@Override
public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) {
	mainThreadExecutor.assertRunningInMainThread();

	final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();

	if (checkpointCoordinator == null) {
		return FutureUtils.completedExceptionally(new IllegalStateException(
			String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
	}

	if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
		log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

		return FutureUtils.completedExceptionally(new IllegalStateException(
			"No savepoint directory configured. You can either specify a directory " +
				"while cancelling via -s :targetDirectory or configure a cluster-wide " +
				"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
	}

	// we stop the checkpoint coordinator so that we are guaranteed
	// to have only the data of the synchronous savepoint committed.
	// in case of failure, and if the job restarts, the coordinator
	// will be restarted by the CheckpointCoordinatorDeActivator.
	checkpointCoordinator.stopCheckpointScheduler();

	final long now = System.currentTimeMillis();
	final CompletableFuture<String> savepointFuture = checkpointCoordinator
			.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
			.thenApply(CompletedCheckpoint::getExternalPointer);

	final CompletableFuture<JobStatus> terminationFuture = executionGraph
		.getTerminationFuture()
		.handle((jobstatus, throwable) -> {

			if (throwable != null) {
				log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
				throw new CompletionException(throwable);
			} else if (jobstatus != JobStatus.FINISHED) {
				log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
				throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
			}
			return jobstatus;
		});

	return savepointFuture.thenCompose((path) ->
		terminationFuture.thenApply((jobStatus -> path)));
}
 
源代码16 项目: soabase-halva   文件: FutureForFactory.java
@SuppressWarnings("unchecked")
public <A> CompletableFuture flatMap(CompletableFuture m, Function<A, ? extends CompletableFuture> flatMapper)
{
    return m.thenCompose(flatMapper);
}
 
源代码17 项目: fdb-record-layer   文件: KeySpaceDirectory.java
@Nonnull
@SuppressWarnings("squid:S2095") // SonarQube doesn't realize that the cursor is wrapped and returned
protected RecordCursor<ResolvedKeySpacePath> listSubdirectoryAsync(@Nullable KeySpacePath listFrom,
                                                                   @Nonnull FDBRecordContext context,
                                                                   @Nonnull String subdirName,
                                                                   @Nullable ValueRange<?> valueRange,
                                                                   @Nullable byte[] continuation,
                                                                   @Nonnull ScanProperties scanProperties) {
    if (listFrom != null && listFrom.getDirectory() != this) {
        throw new RecordCoreException("Provided path does not belong to this directory")
                .addLogInfo("path", listFrom, "directory", this.getName());
    }

    final KeySpaceDirectory subdir = getSubdirectory(subdirName);
    final CompletableFuture<ResolvedKeySpacePath> resolvedFromFuture = listFrom == null
            ? CompletableFuture.completedFuture(null)
            : listFrom.toResolvedPathAsync(context);

    // The chained cursor cannot implement reverse scan, so we implement it by having the
    // inner key value cursor do the reversing but telling the chained cursor we are moving
    // forward.
    final ScanProperties chainedCursorScanProperties;
    if (scanProperties.isReverse()) {
        chainedCursorScanProperties = scanProperties.setReverse(false);
    } else {
        chainedCursorScanProperties = scanProperties;
    }

    // For the read of the individual row keys, we only want to read a single key. In addition,
    // the ChainedCursor is going to do counting of our reads to apply any limits that were specified
    // on the ScanProperties.  We don't want the inner KeyValueCursor in nextTuple() to ALSO count those
    // same reads so we clear out its limits.
    final ScanProperties keyReadScanProperties = scanProperties.with(
            props -> props.clearState().setReturnedRowLimit(1));

    return new LazyCursor<>(
            resolvedFromFuture.thenCompose(resolvedFrom -> {
                final Subspace subspace = resolvedFrom == null ? new Subspace() : resolvedFrom.toSubspace();
                return subdir.getValueRange(context, valueRange, subspace).thenApply(range -> {
                    final RecordCursor<Tuple> cursor = new ChainedCursor<>(
                            context,
                            lastKey -> nextTuple(context, subspace, range, lastKey, keyReadScanProperties),
                            Tuple::pack,
                            Tuple::fromBytes,
                            continuation,
                            chainedCursorScanProperties);

                    return cursor.mapPipelined(tuple -> {
                        final Tuple key = Tuple.fromList(tuple.getItems());
                        return findChildForKey(context, resolvedFrom, key, 1, 0);
                    }, 1);
                });
            }),
            context.getExecutor()
    );
}
 
public static void main(String[] args) throws Exception {
   DefaultCacheManager cacheManager = new DefaultCacheManager();
   cacheManager.defineConfiguration("local", new ConfigurationBuilder().build());
   AdvancedCache<String, String> cache = cacheManager.<String, String>getCache("local").getAdvancedCache();
   FunctionalMapImpl<String, String> functionalMap = FunctionalMapImpl.create(cache);
   FunctionalMap.WriteOnlyMap<String, String> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
   FunctionalMap.ReadOnlyMap<String, String> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);

   // Execute two parallel write-only operation to store key/value pairs
   CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", "value1",
      (v, writeView) -> writeView.set(v));
   CompletableFuture<Void> writeFuture2 = writeOnlyMap.eval("key2", "value2",
      (v, writeView) -> writeView.set(v));

   // When each write-only operation completes, execute a read-only operation to retrieve the value
   CompletableFuture<String> readFuture1 =
      writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::get));
   CompletableFuture<String> readFuture2 =
      writeFuture2.thenCompose(r -> readOnlyMap.eval("key2", EntryView.ReadEntryView::get));

   // When the read-only operation completes, print it out
   System.out.printf("Created entries: %n");
   CompletableFuture<Void> end = readFuture1.thenAcceptBoth(readFuture2, (v1, v2) ->
      System.out.printf("key1 = %s%nkey2 = %s%n", v1, v2));

   // Wait for this read/write combination to finish
   end.get();

   // Create a read-write map
   FunctionalMap.ReadWriteMap<String, String> readWriteMap = ReadWriteMapImpl.create(functionalMap);

   // Use read-write multi-key based operation to write new values
   // together with lifespan and return previous values
   Map<String, String> data = new HashMap<>();
   data.put("key1", "newValue1");
   data.put("key2", "newValue2");
   Traversable<String> previousValues = readWriteMap.evalMany(data, (v, readWriteView) -> {
      String prev = readWriteView.find().orElse(null);
      readWriteView.set(v, new MetaLifespan(Duration.ofHours(1).toMillis()));
      return prev;
   });

   // Use read-only multi-key operation to read current values for multiple keys
   Traversable<EntryView.ReadEntryView<String, String>> entryViews =
      readOnlyMap.evalMany(data.keySet(), readOnlyView -> readOnlyView);
   System.out.printf("Updated entries: %n");
   entryViews.forEach(view -> System.out.printf("%s%n", view));

   // Finally, print out the previous entry values
   System.out.printf("Previous entry values: %n");
   previousValues.forEach(prev -> System.out.printf("%s%n", prev));
}
 
源代码19 项目: incubator-nemo   文件: PipeManagerWorker.java
public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
                                                             final RuntimeEdge runtimeEdge,
                                                             final int dstTaskIndex) {
  final String runtimeEdgeId = runtimeEdge.getId();
  // Get the location of the src task (blocking call)
  final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
    .getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
      ControlMessage.Message.newBuilder()
        .setId(RuntimeIdManager.generateMessageId())
        .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
        .setType(ControlMessage.MessageType.RequestPipeLoc)
        .setRequestPipeLocMsg(
          ControlMessage.RequestPipeLocationMessage.newBuilder()
            .setExecutorId(executorId)
            .setRuntimeEdgeId(runtimeEdgeId)
            .setSrcTaskIndex(srcTaskIndex)
            .build())
        .build());


  return responseFromMasterFuture.thenCompose(responseFromMaster -> {
    // Get executor id
    if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
      throw new RuntimeException("Response message type mismatch!");
    }
    final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
    if (!pipeLocInfo.hasExecutorId()) {
      throw new IllegalStateException();
    }
    final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();

    // Descriptor
    final ControlMessage.PipeTransferContextDescriptor descriptor =
      ControlMessage.PipeTransferContextDescriptor.newBuilder()
        .setRuntimeEdgeId(runtimeEdgeId)
        .setSrcTaskIndex(srcTaskIndex)
        .setDstTaskIndex(dstTaskIndex)
        .setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
        .build();

    // Connect to the executor
    return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
      .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
        serializerManager.getSerializer(runtimeEdgeId)));
  });
}
 
源代码20 项目: Flink-CEPplus   文件: FutureUtils.java
/**
 * This function takes a {@link CompletableFuture} and a function to compose with this future. If the input future
 * is already done, this function returns {@link CompletableFuture#thenCompose(Function)}. Otherwise, the return
 * value is {@link CompletableFuture#thenComposeAsync(Function, Executor)} with the given executor.
 *
 * @param completableFuture the completable future for which we want to compose.
 * @param executor the executor to run the compose function if the future is not yet done.
 * @param composeFun the function to compose.
 * @param <IN> type of the input future.
 * @param <OUT> type of the output future.
 * @return a completable future that is a composition of the input future and the function.
 */
public static <IN, OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone(
	CompletableFuture<IN> completableFuture,
	Executor executor,
	Function<? super IN, ? extends CompletionStage<OUT>> composeFun) {
	return completableFuture.isDone() ?
		completableFuture.thenCompose(composeFun) :
		completableFuture.thenComposeAsync(composeFun, executor);
}