下面列出了java.util.concurrent.CompletableFuture#thenApply ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Get an aggregate from the current command context.
*/
@Override
public <T extends IAggregateRoot> CompletableFuture<T> getAsync(Object id, boolean firstFromCache, Class<T> aggregateRootType) {
if (id == null) {
throw new IllegalArgumentException("id");
}
String aggregateRootId = id.toString();
T iAggregateRoot = (T) trackingAggregateRootDict.get(aggregateRootId);
CompletableFuture<T> future = new CompletableFuture<>();
if (iAggregateRoot != null) {
future.complete(iAggregateRoot);
return future;
}
if (firstFromCache) {
future = repository.getAsync(aggregateRootType, id);
} else {
future = aggregateRootStorage.getAsync(aggregateRootType, aggregateRootId);
}
return future.thenApply(aggregateRoot -> {
if (aggregateRoot != null) {
trackingAggregateRootDict.putIfAbsent(aggregateRoot.getUniqueId(), aggregateRoot);
}
return aggregateRoot;
});
}
@Test
public void cacheNulls() {
AsyncLoadingCache<String, String> cache = new AsyncLoadingCache<>(100);
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> signal = new CompletableFuture<>();
final Supplier<CompletableFuture<String>> supplier = () -> {
counter.incrementAndGet();
return signal.thenApply(ignored -> null);
};
CompletableFuture<String> future = cache.orElseGet("key", supplier);
assertEquals(1, counter.get());
signal.complete(null);
String value = future.join();
assertNull(value);
CompletableFuture<String> cachedFuture = cache.orElseGet("key", supplier);
assertEquals(1, counter.get()); // supplier should not need to run
value = cachedFuture.join();
assertNull(value);
}
private ExtendedDirectoryLayer(@Nonnull FDBDatabase database,
@Nullable KeySpacePath path,
@Nullable CompletableFuture<ResolvedKeySpacePath> resolvedPathFuture) {
super(database, path, resolvedPathFuture);
if (path == null && resolvedPathFuture == null) {
this.isRootLevel = true;
this.baseSubspaceFuture = CompletableFuture.completedFuture(DEFAULT_BASE_SUBSPACE);
this.nodeSubspaceFuture = CompletableFuture.completedFuture(DEFAULT_NODE_SUBSPACE);
this.contentSubspace = DEFAULT_CONTENT_SUBSPACE;
} else {
this.isRootLevel = false;
this.baseSubspaceFuture = resolvedPathFuture.thenApply(ResolvedKeySpacePath::toSubspace);
this.nodeSubspaceFuture = baseSubspaceFuture.thenApply(base ->
new Subspace(Bytes.concat(base.getKey(), DirectoryLayer.DEFAULT_NODE_SUBSPACE.getKey())));
this.contentSubspace = new Subspace(RESERVED_CONTENT_SUBSPACE_PREFIX);
}
this.stateSubspaceFuture = nodeSubspaceFuture.thenApply(node -> node.get(STATE_SUBSPACE_KEY_SUFFIX));
}
public CompletableFuture<List<UserVO>> list(String login, String loginPattern, Integer role, Integer status, String sortField,
String sortOrder, Integer take, Integer skip) {
ListUserRequest request = new ListUserRequest();
request.setLogin(login);
request.setLoginPattern(loginPattern);
request.setRole(role);
request.setStatus(status);
request.setSortField(sortField);
request.setSortOrder(sortOrder);
request.setTake(take);
request.setSkip(skip);
CompletableFuture<Response> future = new CompletableFuture<>();
rpcClient.call(Request
.newBuilder()
.withBody(request)
.build(), new ResponseConsumer(future));
return future.thenApply(r -> ((ListUserResponse) r.getBody()).getUsers());
}
@Override
public CompletableFuture<List<CompletionItem>> getCompletions(CompletableFuture<CamelCatalog> camelCatalog, int positionInCamelUri, TextDocumentItem docItem) {
if(getStartPositionInUri() <= positionInCamelUri && positionInCamelUri <= getEndPositionInUri()) {
return camelCatalog.thenApply(new CamelOptionNamesCompletionsFuture(this, getComponentName(), optionParamURIInstance.isProducer(), getFilter(positionInCamelUri), positionInCamelUri, getAlreadyDefinedUriOptions()));
} else {
return CompletableFuture.completedFuture(Collections.emptyList());
}
}
/**
* Wait until all the futures complete and join the resulting JSONObjects using the supplied binary operator
* and identity object.
*
* @param futures
* @param identity
* @param reduceOp
* @return
*/
public static CompletableFuture<JSONObject> all(
List<CompletableFuture<JSONObject>> futures
, JSONObject identity
, BinaryOperator<JSONObject> reduceOp
) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<Void> future = CompletableFuture.allOf(cfs);
return future.thenApply(aVoid -> futures.stream().map(CompletableFuture::join).reduce(identity, reduceOp));
}
@Benchmark
public String mapConstN() throws InterruptedException, ExecutionException {
CompletableFuture<String> f = constFuture;
for (int i = 0; i < N.n; i++)
f = f.thenApply(mapF);
return f.get();
}
@Benchmark
public String mapPromiseN() throws InterruptedException, ExecutionException {
CompletableFuture<String> p = new CompletableFuture<String>();
CompletableFuture<String> f = p;
for (int i = 0; i < N.n; i++)
f = f.thenApply(mapF);
p.complete(string);
return f.get();
}
public CompletableFuture<List<SteamQuestStatus>> getCommunityBadgeProgress(long steamId, int badgeId) {
CompletableFuture<JsonObject> json = sendRequest(new GetCommunityBadgeProgress(VERSION_1, steamId, badgeId));
return json.thenApply(root -> {
JsonArray quests = root.getAsJsonObject("response").getAsJsonArray("quests");
if (quests != null) {
Type type = new TypeToken<List<SteamQuestStatus>>() {
}.getType();
return builder().fromJson(quests, type);
}
return new ArrayList<>();
});
}
@Override
public CompletableFuture<deductStockByApp_result> apply(StockOpenServiceAsync iface, deductStockByApp_args deductStockByApp_args) throws SoaException
{
CompletableFuture<com.today.api.stock.response.DeductStockByAppResponse> result = (CompletableFuture<com.today.api.stock.response.DeductStockByAppResponse>) iface.deductStockByApp(deductStockByApp_args.getRequest());
return result.thenApply(( com.today.api.stock.response.DeductStockByAppResponse i) -> {
deductStockByApp_result res = new deductStockByApp_result();
res.setSuccess(i);
return res;
});
}
public CompletableFuture<List<DeviceVO>> list(ListDeviceRequest listDeviceRequest) {
CompletableFuture<Response> future = new CompletableFuture<>();
rpcClient.call(Request
.newBuilder()
.withBody(listDeviceRequest)
.build(), new ResponseConsumer(future));
return future.thenApply(response -> ((ListDeviceResponse) response.getBody()).getDevices());
}
@Override
public CompletableFuture<queryPoints_result> apply(CounterServiceAsync iface, queryPoints_args queryPoints_args) throws SoaException
{
CompletableFuture<java.util.List<com.github.dapeng.basic.api.counter.domain.DataPoint>> result = (CompletableFuture<java.util.List<com.github.dapeng.basic.api.counter.domain.DataPoint>>) iface.queryPoints(queryPoints_args.getCondition(),queryPoints_args.getBeginTimeStamp(),queryPoints_args.getEndTimeStamp());
return result.thenApply(( java.util.List<com.github.dapeng.basic.api.counter.domain.DataPoint> i) -> {
queryPoints_result res = new queryPoints_result();
res.setSuccess(i);
return res;
});
}
public CompletableFuture<Integer> getStoreStatus(int appId, int version) {
CompletableFuture<JsonObject> json = sendRequest(new GetStoreStatus(appId, version));
return json.thenApply(root -> {
JsonObject result = root.getAsJsonObject("result");
return result.getAsJsonPrimitive("store_status").getAsInt();
});
}
@VisibleForTesting
static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> registerProducedPartitions(
ExecutionVertex vertex,
TaskManagerLocation location,
ExecutionAttemptID attemptId) {
ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);
boolean lazyScheduling = vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment();
Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();
Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations =
new ArrayList<>(partitions.size());
for (IntermediateResultPartition partition : partitions) {
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
int maxParallelism = getPartitionMaxParallelism(partition);
CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = vertex
.getExecutionGraph()
.getShuffleMaster()
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture
.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(
partitionDescriptor,
shuffleDescriptor,
maxParallelism,
lazyScheduling));
partitionRegistrations.add(partitionRegistration);
}
return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions =
new LinkedHashMap<>(partitions.size());
rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
return producedPartitions;
});
}
public LinkedHashMap<String, Field> startPipelineInParallel(
List<CompletableFuture<Field>> startPipelineFutures
) throws ExecutionException, InterruptedException {
// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
startPipelineFutures.toArray(new CompletableFuture[0])
);
CompletableFuture<LinkedHashMap<String, Field>> completableFuture = allFutures.thenApply(v -> {
LinkedHashMap<String, Field> pipelineResults = new LinkedHashMap<>();
List<Field> pipelineIds = new ArrayList<>();
boolean success = true;
for (CompletableFuture<Field> future: startPipelineFutures) {
try {
Field startPipelineOutputField = future.get();
if (startPipelineOutputField != null) {
LinkedHashMap<String, Field> fields = startPipelineOutputField.getValueAsListMap();
Field pipelineIdField = fields.get("pipelineId");
if (pipelineIdField != null) {
pipelineIds.add(Field.create(pipelineIdField.getValueAsString()));
pipelineResults.put(pipelineIdField.getValueAsString(), startPipelineOutputField);
}
if (!conf.runInBackground) {
Field finishedSuccessfully = fields.get(Constants.FINISHED_SUCCESSFULLY_FIELD);
if (finishedSuccessfully != null) {
success &= finishedSuccessfully.getValueAsBoolean();
}
}
} else {
success = false;
}
} catch (Exception ex) {
LOG.error(ex.toString(), ex);
errorRecordHandler.onError(StartPipelineErrors.START_PIPELINE_04, ex.toString(), ex);
}
}
LinkedHashMap<String, Field> outputField = new LinkedHashMap<>();
outputField.put(Constants.PIPELINE_IDS_FIELD, Field.create(pipelineIds));
outputField.put(Constants.PIPELINE_RESULTS_FIELD, Field.createListMap(pipelineResults));
if (!conf.runInBackground) {
// Don't set success flag for the task if the runInBackground set
outputField.put(Constants.SUCCESS_FIELD, Field.create(success));
}
return outputField;
});
return completableFuture.get();
}
private Route doHandlePerRequest(final RequestContext ctx,
final DittoHeaders dittoHeaders,
final Source<ByteString, ?> payloadSource,
final Function<String, Command> requestJsonToCommandFunction,
@Nullable final Function<JsonValue, JsonValue> responseTransformFunction) {
final CompletableFuture<HttpResponse> httpResponseFuture = new CompletableFuture<>();
payloadSource
.fold(ByteString.empty(), ByteString::concat)
.map(ByteString::utf8String)
.map(requestJsonToCommandFunction)
.map(command -> {
final JsonSchemaVersion schemaVersion =
dittoHeaders.getSchemaVersion().orElse(command.getImplementedSchemaVersion());
return command.implementsSchemaVersion(schemaVersion) ? command
: CommandNotSupportedException.newBuilder(schemaVersion.toInt())
.dittoHeaders(dittoHeaders)
.build();
})
.to(Sink.actorRef(createHttpPerRequestActor(ctx, httpResponseFuture),
AbstractHttpRequestActor.COMPLETE_MESSAGE))
.run(materializer);
// optional step: transform the response entity:
if (responseTransformFunction != null) {
final CompletableFuture<HttpResponse> transformedResponse = httpResponseFuture.thenApply(response -> {
final boolean isSuccessfulResponse = response.status().isSuccess();
// we have to check if response is empty, because otherwise we'll get an IOException when trying to
// read it
final boolean isEmptyResponse = response.entity().isKnownEmpty();
if (isSuccessfulResponse && !isEmptyResponse) {
final InputStream inputStream = response.entity()
.getDataBytes()
.fold(ByteString.empty(), ByteString::concat)
.runWith(StreamConverters.asInputStream(), materializer);
final JsonValue jsonValue = JsonFactory.readFrom(new InputStreamReader(inputStream));
try {
final JsonValue transformed = responseTransformFunction.apply(jsonValue);
return response.withEntity(ContentTypes.APPLICATION_JSON, transformed.toString());
} catch (final Exception e) {
throw JsonParseException.newBuilder()
.message("Could not transform JSON: " + e.getMessage())
.cause(e)
.build();
}
} else {
// for non-successful and empty responses, don't transform the response body
return response;
}
});
return completeWithFuture(preprocessResponse(transformedResponse));
} else {
return completeWithFuture(preprocessResponse(httpResponseFuture));
}
}
@Override
public <T extends Serializable> CompletableFuture<T> createInitialStateAndExecuteAsync(K key, BucketConfiguration configuration, GridCommand<T> command) {
JCacheEntryProcessor<K, T> entryProcessor = JCacheEntryProcessor.initStateAndExecuteProcessor(command, configuration);
CompletableFuture<CommandResult<T>> result = invokeAsync(key, entryProcessor);
return result.thenApply(CommandResult::getData);
}
public CompletableFuture<InsertResult> executeAsync() {
CompletableFuture<StatementExecuteOk> okF = this.mysqlxSession.asyncSendMessage(
((XMessageBuilder) this.mysqlxSession.<XMessage> getMessageBuilder()).buildRowInsert(this.schemaName, this.tableName, this.insertParams));
return okF.thenApply(ok -> new InsertResultImpl(ok));
}
/**
* Checks that the {@link Execution} termination future is only completed after the
* assigned slot has been released.
*
* <p>NOTE: This test only fails spuriously without the fix of this commit. Thus, one has
* to execute this test multiple times to see the failure.
*/
@Test
public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
final JobVertex jobVertex = createNoOpJobVertex();
final JobVertexID jobVertexId = jobVertex.getID();
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
1,
Collections.singleton(jobVertexId),
slotOwner);
ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
new JobID(),
slotProvider,
new NoRestartStrategy(),
jobVertex);
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
executionVertex.scheduleForExecution(
executionGraph.getSlotProviderStrategy(),
LocationPreferenceConstraint.ANY,
Collections.emptySet()).get();
Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
CompletableFuture<LogicalSlot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
CompletableFuture<?> terminationFuture = executionVertex.cancel();
currentExecutionAttempt.completeCancelling();
CompletableFuture<Boolean> restartFuture = terminationFuture.thenApply(
ignored -> {
assertTrue(returnedSlotFuture.isDone());
return true;
});
// check if the returned slot future was completed first
restartFuture.get();
}
@Override
public Iterable<CompletableFuture<Result<List<SlackGroup>, SlackError>>> listGroups(
GroupsListParams filter
) {
return new AbstractPagedIterable<Result<List<SlackGroup>, SlackError>, String>() {
@Override
protected String getInitialOffset() {
return null;
}
@Override
protected LazyLoadingPage<Result<List<SlackGroup>, SlackError>, String> getPage(
String offset
) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Fetching slack group page from {}", offset);
}
CompletableFuture<Result<GroupsListResponse, SlackError>> resultFuture = postSlackCommand(
SlackMethods.groups_list,
GroupsListParams.builder().from(filter).build(),
GroupsListResponse.class
);
CompletableFuture<Result<List<SlackGroup>, SlackError>> pageFuture = resultFuture.thenApply(
result -> result.mapOk(GroupsListResponse::getGroups)
);
CompletableFuture<Optional<String>> nextCursorMaybeFuture = extractNextCursor(
resultFuture
);
CompletableFuture<Boolean> hasMoreFuture = nextCursorMaybeFuture.thenApply(
Optional::isPresent
);
CompletableFuture<String> nextCursorFuture = nextCursorMaybeFuture.thenApply(
cursorMaybe -> cursorMaybe.orElse(null)
);
return new LazyLoadingPage<>(pageFuture, hasMoreFuture, nextCursorFuture);
}
};
}