下面列出了java.util.concurrent.CompletableFuture#thenCompose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <T> CompletableFuture<T> execute(Command<T> command) {
if (!(command instanceof CreateProjectCommand)) {
return super.execute(command);
}
final CreateProjectCommand c = (CreateProjectCommand) command;
final String projectName = c.projectName();
final long creationTimeMillis = c.timestamp();
final Author author = c.author();
// Do not generate sample files because they are not necessary for the migration test.
CompletableFuture<?> f = delegate().execute(c);
f = f.thenCompose(unused -> delegate().execute(
Command.push(creationTimeMillis, author, projectName, REPO_DOGMA, Revision.HEAD,
"Delete /metadata.json to mimic the legacy project",
"", Markup.PLAINTEXT, Change.ofRemoval(METADATA_JSON))));
f = f.exceptionally(unused -> null);
return f.thenCompose(unused -> delegate().execute(
Command.createRepository(creationTimeMillis, author, projectName, REPO_FOO)))
.thenApply(unused -> null);
}
private CompletableFuture<ImapClient> handleProxyConnect(ImapClientConfiguration clientConfiguration,
CompletableFuture<ImapClient> connectFuture) {
if (!clientConfiguration.proxyConfig().isPresent()) {
return connectFuture;
}
ProxyCommand proxyCommand = new ProxyCommand(
clientConfiguration.hostAndPort(),
clientConfiguration.proxyConfig().get().proxyLocalIpAddress()
);
return connectFuture.thenCompose(imapClient ->
imapClient.send(proxyCommand)
.thenApply(ignored -> {
if (clientConfiguration.useSsl()) {
imapClient.addTlsToChannel();
}
return imapClient;
})
);
}
/**
* Start a background discovery for the configured HA MQTT object-id.
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
connection.setRetain(true);
connection.setQos(1);
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "No response from the device yet");
// Start all known components and channels within the components and put the Thing offline
// if any subscribing failed ( == broker connection lost)
CompletableFuture<@Nullable Void> future = haComponents.values().stream()
.map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
.reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to one
.exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
return null;
});
return future
.thenCompose(b -> discoverComponents.startDiscovery(connection, 0, discoveryHomeAssistantID, this));
}
private CompletableFuture<PublicKey> loadPublicKey(final PublicKeyIdWithIssuer publicKeyIdWithIssuer,
final Executor executor) {
final String issuer = publicKeyIdWithIssuer.getIssuer();
final String keyId = publicKeyIdWithIssuer.getKeyId();
LOGGER.debug("Loading public key with id <{}> from issuer <{}>.", keyId, issuer);
final JwtSubjectIssuerConfig subjectIssuerConfig =
jwtSubjectIssuersConfig.getConfigItem(issuer)
.orElseThrow(() -> GatewayJwtIssuerNotSupportedException.newBuilder(issuer).build());
final String discoveryEndpoint = getDiscoveryEndpoint(subjectIssuerConfig.getIssuer());
final CompletableFuture<HttpResponse> responseFuture =
CompletableFuture.supplyAsync(() -> getPublicKeysFromDiscoveryEndpoint(discoveryEndpoint));
final CompletableFuture<JsonArray> publicKeysFuture =
responseFuture.thenCompose(this::mapResponseToJsonArray);
return publicKeysFuture.thenApply(publicKeysArray -> mapToPublicKey(publicKeysArray, keyId, discoveryEndpoint))
.toCompletableFuture();
}
private CompletableFuture<Tuple> evaluateEqualRange(@Nonnull TupleRange range,
@Nonnull EvaluateEqualRange function) {
final Tuple tuple = range.getLow();
final int type = (int) tuple.getLong(0);
final long timestamp = tuple.getLong(1);
final int groupingCount = getGroupingCount();
final Tuple groupKey = TupleHelpers.subTuple(tuple, 2, 2 + groupingCount);
final Tuple values = TupleHelpers.subTuple(tuple, 2 + groupingCount, tuple.size());
final CompletableFuture<TimeWindowLeaderboard> leaderboardFuture = oldestLeaderboardMatching(type, timestamp);
return leaderboardFuture.thenCompose(leaderboard -> {
if (leaderboard == null) {
return CompletableFuture.completedFuture(null);
}
final Tuple leaderboardGroupKey = leaderboard.getSubspaceKey().addAll(groupKey);
final Subspace extraSubspace = getSecondarySubspace();
final Subspace rankSubspace = extraSubspace.subspace(leaderboardGroupKey);
final RankedSet.Config leaderboardConfig = config.toBuilder().setNLevels(leaderboard.getNLevels()).build();
final RankedSet rankedSet = new RankedSetIndexHelper.InstrumentedRankedSet(state, rankSubspace, leaderboardConfig);
return function.apply(leaderboard, rankedSet, groupKey, values);
});
}
/**
* Runs the user program entrypoint by scheduling a task on the given {@code scheduledExecutor}.
* The returned {@link CompletableFuture} completes when all jobs of the user application
* succeeded. if any of them fails, or if job submission fails.
*/
private CompletableFuture<Void> runApplicationAsync(
final DispatcherGateway dispatcher,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution) {
final CompletableFuture<List<JobID>> applicationExecutionFuture = new CompletableFuture<>();
// we need to hand in a future as return value because we need to get those JobIs out
// from the scheduled task that executes the user program
applicationExecutionTask = scheduledExecutor.schedule(
() -> runApplicationEntryPoint(
applicationExecutionFuture,
dispatcher,
scheduledExecutor,
enforceSingleJobExecution),
0L,
TimeUnit.MILLISECONDS);
return applicationExecutionFuture.thenCompose(
jobIds -> getApplicationResult(dispatcher, jobIds, scheduledExecutor));
}
/**
* This method loops over each transaction in the list, and commits them in order
* At the end of this method's execution, all transactions in the list would have committed into given list of segments.
*/
private CompletableFuture<Void> commitTransactions(String scope, String stream, List<Long> segments,
List<UUID> transactionsToCommit, OperationContext context, Timer timer) {
// Chain all transaction commit futures one after the other. This will ensure that order of commit
// if honoured and is based on the order in the list.
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (UUID txnId : transactionsToCommit) {
log.info("Committing transaction {} on stream {}/{}", txnId, scope, stream);
// commit transaction in segment store
future = future
// Note, we can use the same segments and transaction id as only
// primary id is taken for creation of txn-segment name and secondary part is erased and replaced with
// transaction's epoch.
// And we are creating duplicates of txn epoch keeping the primary same.
// After committing transactions, we collect the current sizes of segments and update the offset
// at which the transaction was committed into ActiveTxnRecord in an idempotent fashion.
// Note: if its a rerun, transaction commit offsets may have been updated already in previous iteration
// so this will not update/modify it.
.thenCompose(v -> streamMetadataTasks.notifyTxnCommit(scope, stream, segments, txnId))
.thenCompose(v -> streamMetadataTasks.getCurrentSegmentSizes(scope, stream, segments))
.thenCompose(map -> streamMetadataStore.recordCommitOffsets(scope, stream, txnId, map, context, executor))
.thenRun(() -> TransactionMetrics.getInstance().commitTransaction(scope, stream, timer.getElapsed()));
}
return future
.thenCompose(v -> bucketStore.addStreamToBucketStore(BucketStore.ServiceType.WatermarkingService, scope, stream, executor));
}
@Override
protected CompletableFuture<Void> asyncCloseAndComplete() {
CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
synchronized (this) {
logSegmentWriterFuture = this.rollingFuture;
}
if (null == logSegmentWriterFuture) {
return super.asyncCloseAndComplete();
} else {
return logSegmentWriterFuture.thenCompose(segmentWriter1 -> super.asyncCloseAndComplete());
}
}
public void testThenComposeAsync() throws Exception {
// Composing CompletableFuture is complete
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("one");
// Composing function returns a CompletableFuture executed asynchronously
CountDownLatch cdl = new CountDownLatch(1);
CompletableFuture<String> cf2 = cf1.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
while (true) {
try {
cdl.await();
break;
}
catch (InterruptedException e) {
}
}
return str + ", two";
}));
// Ensure returned CompletableFuture completes after call to thenCompose
// This guarantees that any premature internal completion will be
// detected
cdl.countDown();
String val = cf2.get();
Assert.assertNotNull(val);
Assert.assertEquals(val, "one, two");
}
public void testThenComposeAsync() throws Exception {
// Composing CompletableFuture is complete
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("one");
// Composing function returns a CompletableFuture executed asynchronously
CountDownLatch cdl = new CountDownLatch(1);
CompletableFuture<String> cf2 = cf1.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
while (true) {
try {
cdl.await();
break;
}
catch (InterruptedException e) {
}
}
return str + ", two";
}));
// Ensure returned CompletableFuture completes after call to thenCompose
// This guarantees that any premature internal completion will be
// detected
cdl.countDown();
String val = cf2.get();
Assert.assertNotNull(val);
Assert.assertEquals(val, "one, two");
}
private CompletableFuture<ExpandedNodeId> next(NodeId nodeId, RelativePathElement element) {
NodeId referenceTypeId = element.getReferenceTypeId();
boolean includeSubtypes = element.getIncludeSubtypes();
QualifiedName targetName = element.getTargetName();
Namespace namespace = namespaceManager.getNamespace(nodeId.getNamespaceIndex());
CompletableFuture<List<Reference>> future = namespace.getReferences(nodeId);
return future.thenCompose(references -> {
List<ExpandedNodeId> targetNodeIds = references.stream()
/* Filter for references of the requested type or its subtype, if allowed... */
.filter(r -> referenceTypeId.isNull() ||
r.getReferenceTypeId().equals(referenceTypeId) ||
(includeSubtypes && r.subtypeOf(referenceTypeId, server.getReferenceTypes())))
/* Filter for reference direction... */
.filter(r -> r.isInverse() == element.getIsInverse())
/* Map to target ExpandedNodeId... */
.map(Reference::getTargetNodeId)
.collect(toList());
return readTargetBrowseNames(targetNodeIds).thenApply(browseNames -> {
for (int i = 0; i < targetNodeIds.size(); i++) {
ExpandedNodeId targetNodeId = targetNodeIds.get(i);
QualifiedName browseName = browseNames.get(i);
if (browseName.equals(targetName)) {
return targetNodeId;
}
}
return ExpandedNodeId.NULL_VALUE;
});
});
}
@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath);
final CompletableFuture<TriggerResponse> savepointDisposalTriggerFuture = sendRequest(
SavepointDisposalTriggerHeaders.getInstance(),
savepointDisposalRequest);
final CompletableFuture<AsynchronousOperationInfo> savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose(
(TriggerResponse triggerResponse) -> {
final TriggerId triggerId = triggerResponse.getTriggerId();
final SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
final SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
return pollResourceAsync(
() -> sendRequest(
savepointDisposalStatusHeaders,
savepointDisposalStatusMessageParameters));
});
return savepointDisposalFuture.thenApply(
(AsynchronousOperationInfo asynchronousOperationInfo) -> {
if (asynchronousOperationInfo.getFailureCause() == null) {
return Acknowledge.get();
} else {
throw new CompletionException(asynchronousOperationInfo.getFailureCause());
}
});
}
/**
* Creates txn on the specified stream.
*
* Post-condition:
* 1. If txn creation succeeds, then
* (a) txn node is created in the store,
* (b) txn segments are successfully created on respective segment stores,
* (c) txn is present in the host-txn index of current host,
* (d) txn's timeout is being tracked in timeout service.
*
* 2. If process fails after creating txn node, but before responding to the client, then since txn is
* present in the host-txn index, some other controller process shall abort the txn after maxLeaseValue
*
* 3. If timeout service tracks timeout of specified txn,
* then txn is also present in the host-txn index of current process.
*
* Invariant:
* The following invariants are maintained throughout the execution of createTxn, pingTxn and sealTxn methods.
* 1. If timeout service tracks timeout of a txn, then txn is also present in the host-txn index of current process.
* 2. If txn znode is updated, then txn is also present in the host-txn index of current process.
*
* @param scope scope name.
* @param stream stream name.
* @param lease txn lease.
* @param ctx context.
* @return identifier of the created txn.
*/
CompletableFuture<Pair<VersionedTransactionData, List<StreamSegmentRecord>>> createTxnBody(final String scope,
final String stream,
final long lease,
final OperationContext ctx) {
// Step 1. Validate parameters.
CompletableFuture<Void> validate = validate(lease);
long maxExecutionPeriod = Math.min(MAX_EXECUTION_TIME_MULTIPLIER * lease, Duration.ofDays(1).toMillis());
// 1. get latest epoch from history
// 2. generateNewTransactionId.. this step can throw WriteConflictException
// 3. txn id = 32 bit epoch + 96 bit counter
// 4. if while creating txn epoch no longer exists, then we will get DataNotFoundException.
// 5. Retry if we get WriteConflict or DataNotFoundException, from step 1.
// Note: this is a low probability for either exceptions:
// - WriteConflict, if multiple controllers are trying to get new range at the same time then we can get write conflict
// - DataNotFoundException because it will only happen in rare case
// when we generate the transactionid against latest epoch (if there is ongoing scale then this is new epoch)
// and then epoch node is deleted as scale starts and completes.
return validate.thenCompose(validated -> RetryHelper.withRetriesAsync(() ->
streamMetadataStore.generateTransactionId(scope, stream, ctx, executor)
.thenCompose(txnId -> {
CompletableFuture<Void> addIndex = addTxnToIndex(scope, stream, txnId);
// Step 3. Create txn node in the store.
CompletableFuture<VersionedTransactionData> txnFuture = createTxnInStore(scope, stream, lease,
ctx, maxExecutionPeriod, txnId, addIndex);
// Step 4. Notify segment stores about new txn.
CompletableFuture<List<StreamSegmentRecord>> segmentsFuture = txnFuture.thenComposeAsync(txnData ->
streamMetadataStore.getSegmentsInEpoch(scope, stream, txnData.getEpoch(), ctx, executor), executor);
CompletableFuture<Void> notify = segmentsFuture.thenComposeAsync(activeSegments ->
notifyTxnCreation(scope, stream, activeSegments, txnId), executor).whenComplete((v, e) ->
// Method notifyTxnCreation ensures that notification completes
// even in the presence of n/w or segment store failures.
log.trace("Txn={}, notified segments stores", txnId));
// Step 5. Start tracking txn in timeout service
return notify.whenCompleteAsync((result, ex) -> {
addTxnToTimeoutService(scope, stream, lease, maxExecutionPeriod, txnId, txnFuture);
}, executor).thenApplyAsync(v -> {
List<StreamSegmentRecord> segments = segmentsFuture.join().stream().map(x -> {
long generalizedSegmentId = RecordHelper.generalizedSegmentId(x.segmentId(), txnId);
int epoch = NameUtils.getEpoch(generalizedSegmentId);
int segmentNumber = NameUtils.getSegmentNumber(generalizedSegmentId);
return StreamSegmentRecord.builder().creationEpoch(epoch).segmentNumber(segmentNumber)
.creationTime(x.getCreationTime()).keyStart(x.getKeyStart()).keyEnd(x.getKeyEnd()).build();
}).collect(Collectors.toList());
return new ImmutablePair<>(txnFuture.join(), segments);
}, executor);
}), e -> {
Throwable unwrap = Exceptions.unwrap(e);
return unwrap instanceof StoreException.WriteConflictException || unwrap instanceof StoreException.DataNotFoundException;
}, 5, executor));
}
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
}
@Override
public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) {
mainThreadExecutor.assertRunningInMainThread();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
return FutureUtils.completedExceptionally(new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
}
if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
return FutureUtils.completedExceptionally(new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
final long now = System.currentTimeMillis();
final CompletableFuture<String> savepointFuture = checkpointCoordinator
.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
final CompletableFuture<JobStatus> terminationFuture = executionGraph
.getTerminationFuture()
.handle((jobstatus, throwable) -> {
if (throwable != null) {
log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
throw new CompletionException(throwable);
} else if (jobstatus != JobStatus.FINISHED) {
log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
}
return jobstatus;
});
return savepointFuture.thenCompose((path) ->
terminationFuture.thenApply((jobStatus -> path)));
}
@SuppressWarnings("unchecked")
public <A> CompletableFuture flatMap(CompletableFuture m, Function<A, ? extends CompletableFuture> flatMapper)
{
return m.thenCompose(flatMapper);
}
@Nonnull
@SuppressWarnings("squid:S2095") // SonarQube doesn't realize that the cursor is wrapped and returned
protected RecordCursor<ResolvedKeySpacePath> listSubdirectoryAsync(@Nullable KeySpacePath listFrom,
@Nonnull FDBRecordContext context,
@Nonnull String subdirName,
@Nullable ValueRange<?> valueRange,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
if (listFrom != null && listFrom.getDirectory() != this) {
throw new RecordCoreException("Provided path does not belong to this directory")
.addLogInfo("path", listFrom, "directory", this.getName());
}
final KeySpaceDirectory subdir = getSubdirectory(subdirName);
final CompletableFuture<ResolvedKeySpacePath> resolvedFromFuture = listFrom == null
? CompletableFuture.completedFuture(null)
: listFrom.toResolvedPathAsync(context);
// The chained cursor cannot implement reverse scan, so we implement it by having the
// inner key value cursor do the reversing but telling the chained cursor we are moving
// forward.
final ScanProperties chainedCursorScanProperties;
if (scanProperties.isReverse()) {
chainedCursorScanProperties = scanProperties.setReverse(false);
} else {
chainedCursorScanProperties = scanProperties;
}
// For the read of the individual row keys, we only want to read a single key. In addition,
// the ChainedCursor is going to do counting of our reads to apply any limits that were specified
// on the ScanProperties. We don't want the inner KeyValueCursor in nextTuple() to ALSO count those
// same reads so we clear out its limits.
final ScanProperties keyReadScanProperties = scanProperties.with(
props -> props.clearState().setReturnedRowLimit(1));
return new LazyCursor<>(
resolvedFromFuture.thenCompose(resolvedFrom -> {
final Subspace subspace = resolvedFrom == null ? new Subspace() : resolvedFrom.toSubspace();
return subdir.getValueRange(context, valueRange, subspace).thenApply(range -> {
final RecordCursor<Tuple> cursor = new ChainedCursor<>(
context,
lastKey -> nextTuple(context, subspace, range, lastKey, keyReadScanProperties),
Tuple::pack,
Tuple::fromBytes,
continuation,
chainedCursorScanProperties);
return cursor.mapPipelined(tuple -> {
final Tuple key = Tuple.fromList(tuple.getItems());
return findChildForKey(context, resolvedFrom, key, 1, 0);
}, 1);
});
}),
context.getExecutor()
);
}
public static void main(String[] args) throws Exception {
DefaultCacheManager cacheManager = new DefaultCacheManager();
cacheManager.defineConfiguration("local", new ConfigurationBuilder().build());
AdvancedCache<String, String> cache = cacheManager.<String, String>getCache("local").getAdvancedCache();
FunctionalMapImpl<String, String> functionalMap = FunctionalMapImpl.create(cache);
FunctionalMap.WriteOnlyMap<String, String> writeOnlyMap = WriteOnlyMapImpl.create(functionalMap);
FunctionalMap.ReadOnlyMap<String, String> readOnlyMap = ReadOnlyMapImpl.create(functionalMap);
// Execute two parallel write-only operation to store key/value pairs
CompletableFuture<Void> writeFuture1 = writeOnlyMap.eval("key1", "value1",
(v, writeView) -> writeView.set(v));
CompletableFuture<Void> writeFuture2 = writeOnlyMap.eval("key2", "value2",
(v, writeView) -> writeView.set(v));
// When each write-only operation completes, execute a read-only operation to retrieve the value
CompletableFuture<String> readFuture1 =
writeFuture1.thenCompose(r -> readOnlyMap.eval("key1", EntryView.ReadEntryView::get));
CompletableFuture<String> readFuture2 =
writeFuture2.thenCompose(r -> readOnlyMap.eval("key2", EntryView.ReadEntryView::get));
// When the read-only operation completes, print it out
System.out.printf("Created entries: %n");
CompletableFuture<Void> end = readFuture1.thenAcceptBoth(readFuture2, (v1, v2) ->
System.out.printf("key1 = %s%nkey2 = %s%n", v1, v2));
// Wait for this read/write combination to finish
end.get();
// Create a read-write map
FunctionalMap.ReadWriteMap<String, String> readWriteMap = ReadWriteMapImpl.create(functionalMap);
// Use read-write multi-key based operation to write new values
// together with lifespan and return previous values
Map<String, String> data = new HashMap<>();
data.put("key1", "newValue1");
data.put("key2", "newValue2");
Traversable<String> previousValues = readWriteMap.evalMany(data, (v, readWriteView) -> {
String prev = readWriteView.find().orElse(null);
readWriteView.set(v, new MetaLifespan(Duration.ofHours(1).toMillis()));
return prev;
});
// Use read-only multi-key operation to read current values for multiple keys
Traversable<EntryView.ReadEntryView<String, String>> entryViews =
readOnlyMap.evalMany(data.keySet(), readOnlyView -> readOnlyView);
System.out.printf("Updated entries: %n");
entryViews.forEach(view -> System.out.printf("%s%n", view));
// Finally, print out the previous entry values
System.out.printf("Previous entry values: %n");
previousValues.forEach(prev -> System.out.printf("%s%n", prev));
}
public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
final RuntimeEdge runtimeEdge,
final int dstTaskIndex) {
final String runtimeEdgeId = runtimeEdge.getId();
// Get the location of the src task (blocking call)
final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
.getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.RequestPipeLoc)
.setRequestPipeLocMsg(
ControlMessage.RequestPipeLocationMessage.newBuilder()
.setExecutorId(executorId)
.setRuntimeEdgeId(runtimeEdgeId)
.setSrcTaskIndex(srcTaskIndex)
.build())
.build());
return responseFromMasterFuture.thenCompose(responseFromMaster -> {
// Get executor id
if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
throw new RuntimeException("Response message type mismatch!");
}
final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
if (!pipeLocInfo.hasExecutorId()) {
throw new IllegalStateException();
}
final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();
// Descriptor
final ControlMessage.PipeTransferContextDescriptor descriptor =
ControlMessage.PipeTransferContextDescriptor.newBuilder()
.setRuntimeEdgeId(runtimeEdgeId)
.setSrcTaskIndex(srcTaskIndex)
.setDstTaskIndex(dstTaskIndex)
.setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
.build();
// Connect to the executor
return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
.thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
serializerManager.getSerializer(runtimeEdgeId)));
});
}
/**
* This function takes a {@link CompletableFuture} and a function to compose with this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenCompose(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenComposeAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to compose.
* @param executor the executor to run the compose function if the future is not yet done.
* @param composeFun the function to compose.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is a composition of the input future and the function.
*/
public static <IN, OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends CompletionStage<OUT>> composeFun) {
return completableFuture.isDone() ?
completableFuture.thenCompose(composeFun) :
completableFuture.thenComposeAsync(composeFun, executor);
}