下面列出了怎么用java.util.concurrent.CompletionStage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testClosePipeline() {
final AtomicBoolean closed = new AtomicBoolean();
final AsyncIterator<Long> ai =
new AsyncIterator<Long>() {
@Override
public CompletionStage<Either<End, Long>> nextStage() {
return StageSupport.completedStage(Either.right(1L));
}
@Override
public CompletionStage<Void> close() {
closed.set(true);
return StageSupport.voidStage();
}
}.take(10);
final AsyncIterator<?> intermediateAi = this.intermediate.apply(ai);
this.terminal.apply(intermediateAi).toCompletableFuture().join();
Assert.assertFalse(closed.get());
intermediateAi.close().toCompletableFuture().join();
Assert.assertTrue(closed.get());
}
@Test
public void testMarkdownPitchNotFound() throws Exception {
TestServer server = testServer(3333);
running(server, () -> {
try {
WSClient ws = play.libs.ws.WS.newClient(3333);
CompletionStage<WSResponse> completionStage =
ws.url(MARKDOWN_PITCH_NOT_FOUND).get();
WSResponse response = completionStage.toCompletableFuture().get();
ws.close();
assertEquals(OK, response.getStatus());
assertEquals(CONTENT_TYPE_MD, response.getHeader(CONTENT_TYPE));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
}
default CompletionStage<ResultSet> executeReactiveQueryStatement(
String sqlStatement,
QueryParameters queryParameters,
List<AfterLoadAction> afterLoadActions,
SessionImplementor session) {
// Processing query filters.
queryParameters.processFilters( sqlStatement, session );
// Applying LIMIT clause.
final LimitHandler limitHandler = limitHandler( queryParameters.getRowSelection(), session );
String sql = limitHandler.processSql( queryParameters.getFilteredSQL(), queryParameters.getRowSelection() );
// Adding locks and comments.
sql = preprocessSQL( sql, queryParameters, session.getSessionFactory(), afterLoadActions );
return session.unwrap(ReactiveSession.class)
.getReactiveConnection()
.selectJdbc( sql, toParameterArray(queryParameters, session) );
}
/**
* Unregister all currently registered clusters.
*
* @return future that is completed when all clusters are unregistered.
*/
public CompletionStage<Integer> unregisterAllAsync() {
if (isClosed()) {
return failByClose();
}
List<CompletableFuture<BoundCluster>> futures =
clusters
.keySet()
.stream()
.map(this::unregisterAsync)
.map(CompletionStage::toCompletableFuture)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}))
.thenApply(__ -> futures.size());
}
/**
* Like {@link CompletableFuture#supplyAsync(Supplier)} but for {@link ThrowingSupplier}.
*
* <p>If the {@link ThrowingSupplier} throws an exception, the future completes exceptionally.
*/
public static <T> CompletionStage<T> supplyAsync(
ThrowingSupplier<T> supplier, ExecutorService executorService) {
CompletableFuture<T> result = new CompletableFuture<>();
CompletionStage<Void> wrapper =
CompletableFuture.runAsync(
() -> {
try {
result.complete(supplier.get());
} catch (InterruptedException e) {
result.completeExceptionally(e);
Thread.currentThread().interrupt();
} catch (Throwable t) {
result.completeExceptionally(t);
}
},
executorService);
return wrapper.thenCompose(nothing -> result);
}
@SuppressWarnings("unchecked")
private CompletionStage<?> executeCompletionStage(MethodInvocation invocation,
CompletionStage<?> stage, io.github.resilience4j.retry.Retry.Context context,
RecoveryFunction<?> recoveryFunction) {
final CompletableFuture promise = new CompletableFuture();
stage.whenComplete((v, t) -> {
if (t != null) {
try {
context.onError((Exception) t);
CompletionStage next = (CompletionStage) invocation.proceed();
CompletableFuture temp = executeCompletionStage(invocation, next, context,
recoveryFunction).toCompletableFuture();
promise.complete(temp.join());
} catch (Throwable t2) {
completeFailedFuture(t2, recoveryFunction, promise);
}
} else {
context.onComplete();
promise.complete(v);
}
});
return promise;
}
/**
* Given a draft of {@code D} (e.g. {@link CategoryDraft}) this method attempts to resolve it's custom type
* reference to return {@link CompletionStage} which contains a new instance of the draft with the resolved
* custom type reference. The key of the custom type is taken from the from the id field of the reference.
*
* <p>The method then tries to fetch the key of the custom type, optimistically from a
* cache. If the key is is not found, the resultant draft would remain exactly the same as the passed
* draft (without a custom type reference resolution).
*
* @param draftBuilder the draft builder to resolve it's references.
* @param customGetter a function to return the CustomFieldsDraft instance of the draft builder.
* @param customSetter a function to set the CustomFieldsDraft instance of the builder and return this builder.
* @param errorMessage the error message to inject in the {@link ReferenceResolutionException} if it occurs.
* @return a {@link CompletionStage} that contains as a result a new draft instance with resolved custom
* type references or, in case an error occurs during reference resolution,
* a {@link ReferenceResolutionException}.
*/
@Nonnull
protected CompletionStage<B> resolveCustomTypeReference(
@Nonnull final B draftBuilder,
@Nonnull final Function<B, CustomFieldsDraft> customGetter,
@Nonnull final BiFunction<B, CustomFieldsDraft, B> customSetter,
@Nonnull final String errorMessage) {
final CustomFieldsDraft custom = customGetter.apply(draftBuilder);
if (custom != null) {
return getCustomTypeId(custom, errorMessage)
.thenApply(resolvedTypeIdOptional ->
resolvedTypeIdOptional.map(resolvedTypeId ->
customSetter.apply(draftBuilder, ofTypeIdAndJson(resolvedTypeId, custom.getFields())))
.orElse(draftBuilder));
}
return CompletableFuture.completedFuture(draftBuilder);
}
private Source<SudoRetrieveThingResponse, NotUsed> sudoRetrieveThing(final ThingId thingId) {
final SudoRetrieveThing command =
SudoRetrieveThing.withOriginalSchemaVersion(thingId, DittoHeaders.empty());
final CompletionStage<Source<SudoRetrieveThingResponse, NotUsed>> responseFuture =
// using default thread-pool for asking Things shard region
Patterns.ask(thingsShardRegion, command, thingsTimeout)
.handle((response, error) -> {
if (response instanceof SudoRetrieveThingResponse) {
return Source.single((SudoRetrieveThingResponse) response);
} else {
if (error != null) {
log.error("Failed " + command, error);
} else if (!(response instanceof ThingNotAccessibleException)) {
log.error("Unexpected response for <{}>: <{}>", command, response);
}
return Source.empty();
}
});
return Source.fromSourceCompletionStage(responseFuture)
.viaMat(Flow.create(), Keep.none());
}
/**
* Perform a POST operation on a LDP Resource.
*
* @param uriInfo the URI info
* @param secContext the security context
* @param headers the HTTP headers
* @param request the request
* @param body the body
* @return the async response
*/
@POST
@Timed
@Operation(summary = "Create a linked data resource")
public CompletionStage<Response> createResource(@Context final Request request, @Context final UriInfo uriInfo,
@Context final HttpHeaders headers, @Context final SecurityContext secContext,
@RequestBody(description = "The new resource") final InputStream body) {
final TrellisRequest req = new TrellisRequest(request, uriInfo, headers, secContext);
final String urlBase = getBaseUrl(req);
final String path = req.getPath();
final String identifier = getIdentifier(req);
final String separator = path.isEmpty() ? "" : "/";
final IRI parent = buildTrellisIdentifier(path);
final IRI child = buildTrellisIdentifier(path + separator + identifier);
final PostHandler postHandler = new PostHandler(req, parent, identifier, body, trellis, extensions, urlBase);
return trellis.getResourceService().get(parent)
.thenCombine(trellis.getResourceService().get(child), postHandler::initialize)
.thenCompose(postHandler::createResource).thenCompose(postHandler::updateMemento)
.thenApply(ResponseBuilder::build).exceptionally(this::handleException);
}
@Test
public void should_Support_loading_multiple_keys_in_one_call() {
AtomicBoolean success = new AtomicBoolean();
DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keysAsValues());
CompletionStage<List<Integer>> futureAll = identityLoader.loadMany(asList(1, 2));
futureAll.thenAccept(promisedValues -> {
assertThat(promisedValues.size(), is(2));
success.set(true);
});
identityLoader.dispatch();
await().untilAtomic(success, is(true));
assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2)));
}
/**
* Indicates that the node should resume accepting connections.
*
* @return future that completes when node is listening again.
*/
@Override
public CompletionStage<Void> acceptConnectionsAsync() {
logger.debug("Accepting New Connections");
rejectState.set(new RejectState());
// Reopen listening interface if not currently open.
if (!channel.get().isOpen()) {
return rebind();
} else {
return CompletableFuture.completedFuture(null);
}
}
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
public <U> DependentPromise<U> applyToEitherAsync(CompletionStage<? extends T> other,
Function<? super T, U> fn,
Executor executor,
Set<PromiseOrigin> enlistOptions) {
return wrap(delegate.applyToEitherAsync(other, fn, executor), originAndParam(other, enlistOptions));
}
private <U,V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T,? super U,? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
@Override
public <U> CompletionStage<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
final BiConsumer<? super T, ? super U> action, final Executor executor) {
Objects.requireNonNull(action);
return thenCombineAsync(other, (t, u) -> {
action.accept(t, u);
return null;
}, executor);
}
/**
* this action method will be used to free Up user Identifier from user DB
* @return
*/
public CompletionStage<Result> freeUpIdentifier(Http.Request httpRequest) {
return handleRequest(
ActorOperations.FREEUP_USER_IDENTITY.getValue(),
httpRequest.body().asJson(),
req -> {
Request request = (Request) req;
UserFreeUpRequestValidator.getInstance(request).validate();
return null;
},
null,
null,
true,
httpRequest);
}
/**
* Service will retry a method returning a CompletionStage and configured to always completeExceptionally.
*
* @return a {@link CompletionStage}
*/
@Retry(maxRetries = 2)
public CompletionStage<String> serviceBFailExceptionally(final CompletionStage future) {
countInvocationsServBFailExceptionally++;
// always fail
future.toCompletableFuture().completeExceptionally(new IOException("Simulated error"));
return future;
}
@Override
public CompletionStage<BackfillsPayload> backfillList(Optional<String> componentId,
Optional<String> workflowId,
boolean showAll,
boolean includeStatus) {
var url = urlBuilder("backfills");
componentId.ifPresent(c -> url.addQueryParameter("component", c));
workflowId.ifPresent(w -> url.addQueryParameter("workflow", w));
url.addQueryParameter("showAll", Boolean.toString(showAll));
url.addQueryParameter("status", Boolean.toString(includeStatus));
return execute(forUri(url), BackfillsPayload.class);
}
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/json")
@POST
public CompletionStage<Map<String, Object>> postJson(final Map<String, Object> requestContent) {
final Map<String, Object> responseContent = new HashMap<>(requestContent);
responseContent.put("foo", "bar1");
return completedFuture(responseContent);
}
private <E extends Throwable, T> void retryIfCovered(
E e, ScheduledExecutorService retryExecutor,
CheckedSupplier<? extends CompletionStage<T>, ?> supplier, CompletableFuture<T> future)
throws E {
if (plan.covers(e)) {
scheduleRetry(e, retryExecutor, supplier, future);
} else {
throw e;
}
}
@Test
public void testInjectHaltEvent() throws Exception {
Event injectedEvent = Event.halt(WFI);
ByteString eventPayload = serialize(injectedEvent);
CompletionStage<Response<ByteString>> post =
serviceHelper.request("POST", BASE + "/events", eventPayload);
post.toCompletableFuture().get(1, MINUTES); // block until done
verify(stateManager).receive(Event.halt(WFI));
}
/**
* Completion of a toCompletableFuture copy of a minimal stage
* does not complete its source.
*/
public void testMinimalCompletionStage_toCompletableFuture_oneWayPropagation() {
CompletableFuture<Integer> f = new CompletableFuture<>();
CompletionStage<Integer> g = f.minimalCompletionStage();
assertTrue(g.toCompletableFuture().complete(1));
assertTrue(g.toCompletableFuture().complete(null));
assertTrue(g.toCompletableFuture().cancel(true));
assertTrue(g.toCompletableFuture().cancel(false));
assertTrue(g.toCompletableFuture().completeExceptionally(new CFException()));
checkIncomplete(g.toCompletableFuture());
f.complete(1);
checkCompletedNormally(g.toCompletableFuture(), 1);
}
private static <W extends BoundedWindow>
CompletionStage<StateResponse.Builder> handleClearRequest(
StateRequest request,
ByteString key,
W window,
BagUserStateHandler<ByteString, ByteString, W> handler) {
handler.clear(key, window);
return CompletableFuture.completedFuture(
StateResponse.newBuilder()
.setId(request.getId())
.setClear(StateClearResponse.getDefaultInstance()));
}
/**
* Creates an infinite AsyncIterator starting at {@code start}.
*
* @param start the start point of iteration (inclusive)
* @return an AsyncIterator that will return longs starting with start
*/
static AsyncIterator<Long> infiniteRange(final long start) {
return new AsyncIterator<Long>() {
long counter = start;
@Override
public CompletionStage<Either<End, Long>> nextStage() {
return StageSupport.completedStage(Either.right(this.counter++));
}
};
}
@Override
protected CompletionStage<CartDiscountSyncStatistics> processBatch(@Nonnull final List<CartDiscountDraft> batch) {
final Set<CartDiscountDraft> validCartDiscountDrafts =
batch.stream().filter(this::validateDraft).collect(toSet());
if (validCartDiscountDrafts.isEmpty()) {
statistics.incrementProcessed(batch.size());
return completedFuture(statistics);
}
final Set<String> keys = validCartDiscountDrafts.stream().map(CartDiscountDraft::getKey).collect(toSet());
return cartDiscountService
.fetchMatchingCartDiscountsByKeys(keys)
.handle(ImmutablePair::new)
.thenCompose(fetchResponse -> {
final Set<CartDiscount> fetchedCartDiscounts = fetchResponse.getKey();
final Throwable exception = fetchResponse.getValue();
if (exception != null) {
final String errorMessage = format(CTP_CART_DISCOUNT_FETCH_FAILED, keys);
handleError(errorMessage, exception, keys.size());
return CompletableFuture.completedFuture(null);
} else {
return syncBatch(fetchedCartDiscounts, validCartDiscountDrafts);
}
})
.thenApply(ignored -> {
statistics.incrementProcessed(batch.size());
return statistics;
});
}
@Override
public CompletionStage<Void> thenAccept(final Consumer<? super T> action) {
Objects.requireNonNull(action);
if (this.exception == null) {
try {
action.accept(this.result);
} catch (final Throwable e) {
return CompletedStage.completionException(e);
}
return VOID;
}
return typedException();
}
@Override
public CompletionStage<T> getCompletionStage() {
if (publicStage == null) {
publicStage = new CompletableFuture<>();
}
return publicStage;
}
@Override
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
IdentitySet context,
boolean isCascadeDeleteEnabled)
throws HibernateException {
LOG.tracev( "Cascading to persist: {0}", entityName );
return session.unwrap(ReactiveSession.class).reactivePersist( child, context );
}
@Override
public CompletionStage<Result> apply(final Function<Http.RequestHeader, CompletionStage<Result>> nextFilter,
final Http.RequestHeader requestHeader) {
if (httpAuthentication != null && httpAuthentication.isEnabled()) {
return authenticate(nextFilter, requestHeader, httpAuthentication);
} else {
return nextFilter.apply(requestHeader);
}
}
@RegistryServiceTest
public void testLabels(Supplier<RegistryService> supplier) throws Exception {
String artifactId = generateArtifactId();
RegistryService client = supplier.get();
try {
ByteArrayInputStream stream = new ByteArrayInputStream("{\"name\":\"redhat\"}".getBytes(StandardCharsets.UTF_8));
CompletionStage<ArtifactMetaData> csResult = client.createArtifact(ArtifactType.JSON, artifactId, null, stream);
ConcurrentUtil.result(csResult);
this.waitForArtifact(artifactId);
EditableMetaData emd = new EditableMetaData();
emd.setName("myname");
final List<String> artifactLabels = Arrays.asList("Open Api", "Awesome Artifact", "JSON");
emd.setLabels(artifactLabels);
client.updateArtifactMetaData(artifactId, emd);
retry(() -> {
ArtifactMetaData artifactMetaData = client.getArtifactMetaData(artifactId);
Assertions.assertNotNull(artifactMetaData);
Assertions.assertEquals("myname", artifactMetaData.getName());
Assertions.assertEquals(3, artifactMetaData.getLabels().size());
Assertions.assertTrue(artifactMetaData.getLabels().containsAll(artifactLabels));
});
retry((() -> {
ArtifactSearchResults results = client
.searchArtifacts("open api", 0, 2, SearchOver.labels, SortOrder.asc);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());
Assertions.assertEquals(1, results.getArtifacts().size());
Assertions.assertTrue(results.getArtifacts().get(0).getLabels().containsAll(artifactLabels));
}));
} finally {
client.deleteArtifact(artifactId);
}
}