下面列出了java.util.concurrent.CompletableFuture#exceptionally ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @param collector
* to perform aggregation / reduction operation on the results
* from active stage (e.g. to Collect into a List or String)
* @param fn
* Function that receives the results of all currently active
* tasks as input
* @return A new builder object that can be used to define the next stage in
* the dataflow
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
default <R1, R2> SimpleReactStream<R2> allOf(final Collector<? super U, ?, R1> collector, final Function<? super R1, ? extends R2> fn) {
final CompletableFuture[] array = lastActiveArray(getLastActive());
final CompletableFuture cf = CompletableFuture.allOf(array);
final Function<Exception, R2> f = (final Exception e) -> {
BlockingStreamHelper.capture(e, getErrorHandler());
return BlockingStreamHelper.block(this, Collectors.toList(), new EagerStreamWrapper(
Stream.of(array), getErrorHandler()));
};
final CompletableFuture onFail = cf.exceptionally(f);
final CompletableFuture onSuccess = onFail.thenApplyAsync((result) -> {
return new StageWithResults(
getTaskExecutor(), null,
result).submit(() -> fn.apply(BlockingStreamHelper.aggregateResultsCompletable(collector, Stream.of(array)
.collect(Collectors.toList()),
getErrorHandler())));
} , getTaskExecutor());
return (SimpleReactStream<R2>) withLastActive(new EagerStreamWrapper(
onSuccess, getErrorHandler()));
}
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));
return archivedExecutionGraphFuture.exceptionally(
(Throwable throwable) -> {
final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
// check whether it is a completed job
if (serializableExecutionGraph == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return serializableExecutionGraph;
}
});
}
/**
* Sends stop RPC call.
*/
public void stop() {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
() -> taskManagerGateway.stopTask(attemptId, rpcTimeout),
NUM_STOP_CALL_TRIES,
vertex.getExecutionGraph().getJobMasterMainThreadExecutor());
stopResultFuture.exceptionally(
failure -> {
LOG.info("Stopping task was not successful.", failure);
return null;
});
}
}
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
return jobStatusFuture.exceptionally(
(Throwable throwable) -> {
final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
// check whether it is a completed job
if (jobDetails == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return jobDetails.getStatus();
}
});
}
@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));
return archivedExecutionGraphFuture.exceptionally(
(Throwable throwable) -> {
final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
// check whether it is a completed job
if (serializableExecutionGraph == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return serializableExecutionGraph;
}
});
}
@Nonnull
@Override
@CheckReturnValue
public Task<List<Member>> retrieveMembersByPrefix(@Nonnull String prefix, int limit)
{
Checks.notEmpty(prefix, "Prefix");
Checks.positive(limit, "Limit");
Checks.check(limit <= 100, "Limit must not be greater than 100");
MemberChunkManager chunkManager = api.getClient().getChunkManager();
List<Member> collect = new ArrayList<>(limit);
CompletableFuture<List<Member>> result = new CompletableFuture<>();
CompletableFuture<Void> handle = chunkManager.chunkGuild(this, prefix, limit, (last, list) -> {
collect.addAll(list);
if (last)
result.complete(collect);
});
result.exceptionally(ex -> {
WebSocketClient.LOG.error("Encountered exception trying to handle member chunk response", ex);
return null;
});
return new GatewayTask<>(result, () -> handle.cancel(false));
}
@Override
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
return jobStatusFuture.exceptionally(
(Throwable throwable) -> {
final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
// check whether it is a completed job
if (jobDetails == null) {
throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
} else {
return jobDetails.getStatus();
}
});
}
@Test
public void forwardExceptionShouldPropagate() throws Exception {
final String value = UUID.randomUUID().toString();
SettableFuture<String> lf = SettableFuture.create();
CompletableFuture<String> cf = MoreFutures.toCompletableFuture(lf);
Exception intentionalException = new IntentionalException();
final AtomicReference<Throwable> foundException = new AtomicReference<>();
cf = cf.exceptionally(ex -> {
foundException.set(ex);
return value;
});
lf.setException(intentionalException);
assertThat(cf).isDone();
assertThat(cf).isNotCancelled();
assertThat(cf).isNotCompletedExceptionally();
assertThat(cf).isCompletedWithValue(value);
assertThat(foundException.get()).isSameAs(intentionalException);
}
/**
* Tests ordered failure of future callbacks.
*/
public void testOrderedFailure() throws Throwable {
CompletableFuture<String> future = new OrderedCompletableFuture<>();
AtomicInteger order = new AtomicInteger();
future.whenComplete((r, e) -> assertEquals(1, order.incrementAndGet()));
future.whenComplete((r, e) -> assertEquals(2, order.incrementAndGet()));
future.handle((r, e) -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.thenRun(() -> fail());
future.thenAccept(r -> fail());
future.exceptionally(e -> {
assertEquals(3, order.incrementAndGet());
return "bar";
});
future.completeExceptionally(new RuntimeException("foo"));
}
private CompletableFuture<FastSyncState> downloadChainAndWorldState(
final FastSyncState currentState) {
// Synchronized ensures that stop isn't called while we're in the process of starting a
// world state and chain download. If it did we might wind up starting a new download
// after the stop method had called cancel.
synchronized (this) {
if (!running.get()) {
return CompletableFuture.failedFuture(
new CancellationException("FastSyncDownloader stopped"));
}
final CompletableFuture<Void> worldStateFuture =
worldStateDownloader.run(currentState.getPivotBlockHeader().get());
final ChainDownloader chainDownloader = fastSyncActions.createChainDownloader(currentState);
final CompletableFuture<Void> chainFuture = chainDownloader.start();
// If either download fails, cancel the other one.
chainFuture.exceptionally(
error -> {
worldStateFuture.cancel(true);
return null;
});
worldStateFuture.exceptionally(
error -> {
chainDownloader.cancel();
return null;
});
return CompletableFuture.allOf(worldStateFuture, chainFuture)
.thenApply(
complete -> {
trailingPeerRequirements = Optional.empty();
return currentState;
});
}
}
public CompletableFuture<Void> start(final EthScheduler ethScheduler) {
final CompletableFuture<Void> fetchDataFuture = ethScheduler.startPipeline(fetchDataPipeline);
final CompletableFuture<Void> completionFuture = ethScheduler.startPipeline(completionPipeline);
fetchDataFuture.whenComplete(
(result, error) -> {
if (error != null) {
if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
LOG.error("Pipeline failed", error);
}
completionPipeline.abort();
} else {
// No more data to fetch, so propagate the pipe closure onto the completion pipe.
requestsToComplete.close();
}
});
completionFuture.exceptionally(
error -> {
if (!(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
LOG.error("Pipeline failed", error);
}
fetchDataPipeline.abort();
return null;
});
return completionFuture;
}
private <T> void assertCompletedExceptionally(
final CompletableFuture<T> future, final FastSyncError expectedError) {
assertThat(future).isCompletedExceptionally();
future.exceptionally(
actualError -> {
assertThat(actualError)
.isInstanceOf(FastSyncException.class)
.extracting(ex -> ((FastSyncException) ex).getError())
.isEqualTo(expectedError);
return null;
});
}
/**
* Propagates cancellation, and only cancellation, from one future to another.
*
* <p>When <code>from</code> is completed with a {@link
* java.util.concurrent.CancellationException} {@link java.util.concurrent.Future#cancel(boolean)}
* will be called on <code>to</code>, allowing interruption if the future is currently running.
*
* @param from the CompletableFuture to take cancellation from
* @param to the CompletableFuture to propagate cancellation to
*/
public static void propagateCancellation(
final CompletableFuture<?> from, final CompletableFuture<?> to) {
from.exceptionally(
error -> {
if (error instanceof CancellationException) {
to.cancel(true);
}
return null;
});
}
@Override
public CompletableFuture<Integer> getPublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId) {
CompletableFuture<Integer> future = new CompletableFuture<>();
String sql = String.format(SELECT_SQL, tableName);
JsonArray array = new JsonArray();
array.add(processorName);
array.add(aggregateRootId);
sqlClient.querySingleWithParams(sql, array, x -> {
if (x.succeeded()) {
int result = 0;
if (x.result() != null && x.result().size() > 0) {
result = x.result().getInteger(0);
}
future.complete(result);
return;
}
future.completeExceptionally(x.cause());
});
return future.exceptionally(throwable -> {
if (throwable instanceof SQLException) {
SQLException ex = (SQLException) throwable;
logger.error("Get aggregate published version has sql exception.", ex);
throw new IORuntimeException(throwable);
}
logger.error("Get aggregate published version has unknown exception.", throwable);
throw new EnodeRuntimeException(throwable);
});
}
@Nonnull
@Override
public Task<List<Member>> retrieveMembersByIds(boolean includePresence, @Nonnull long... ids)
{
Checks.notNull(ids, "ID Array");
Checks.check(!includePresence || api.isIntent(GatewayIntent.GUILD_PRESENCES),
"Cannot retrieve presences of members without GUILD_PRESENCES intent!");
if (ids.length == 0)
return new GatewayTask<>(CompletableFuture.completedFuture(Collections.emptyList()), () -> {});
Checks.check(ids.length <= 100, "You can only request 100 members at once");
MemberChunkManager chunkManager = api.getClient().getChunkManager();
List<Member> collect = new ArrayList<>(ids.length);
CompletableFuture<List<Member>> result = new CompletableFuture<>();
CompletableFuture<Void> handle = chunkManager.chunkGuild(this, includePresence, ids, (last, list) -> {
collect.addAll(list);
if (last)
result.complete(collect);
});
result.exceptionally(ex -> {
WebSocketClient.LOG.error("Encountered exception trying to handle member chunk response", ex);
return null;
});
return new GatewayTask<>(result, () -> handle.cancel(false));
}
public CompletableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) {
if (shutdown) {
CompletableFuture<Protocol.Response> ret = new CompletableFuture<>();
ret.completeExceptionally(new PeerException("Shutdown " + toString()));
return ret;
}
CompletableFuture<Protocol.Response> future = CompletableFuturesExtra.toCompletableFuture(discoveryFutureStub.discover(signedRequest));
return future.exceptionally(throwable -> {
throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable);
});
}
public FastFuture<T> populateFromCompletableFuture(final CompletableFuture<T> cf) {
cf.thenAccept(i -> this.set(i));
cf.exceptionally(t -> {
completedExceptionally(t);
return join();
});
return this;
}
@Override
public CompletableFuture<Integer> getPublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId) {
CompletableFuture<Integer> future = new CompletableFuture<>();
Bson updateFilter = Filters.and(
Filters.eq("processorName", processorName),
Filters.eq("aggregateRootId", aggregateRootId)
);
mongoClient.getDatabase(configuration.getDatabaseName()).getCollection(configuration.getPublishedVersionCollectionName()).find(updateFilter).subscribe(new Subscriber<Document>() {
private Integer version = 0;
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(Document document) {
version = document.getInteger("version");
future.complete(version);
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onComplete() {
future.complete(version);
}
});
return future.exceptionally(throwable -> {
if (throwable instanceof SQLException) {
SQLException ex = (SQLException) throwable;
logger.error("Get aggregate published version has sql exception.", ex);
throw new IORuntimeException(throwable);
}
logger.error("Get aggregate published version has unknown exception.", throwable);
throw new EnodeRuntimeException(throwable);
});
}
@Override
public CompletableFuture<Integer> updatePublishedVersionAsync(String processorName, String aggregateRootTypeName, String aggregateRootId, int publishedVersion) {
CompletableFuture<Integer> future = new CompletableFuture<>();
String sql = "";
boolean isInsert = publishedVersion == 1;
JsonArray array = new JsonArray();
if (isInsert) {
sql = String.format(INSERT_SQL, tableName);
array.add(processorName);
array.add(aggregateRootTypeName);
array.add(aggregateRootId);
array.add(1);
array.add(new Date().toInstant());
} else {
sql = String.format(UPDATE_SQL, tableName);
array.add(publishedVersion);
array.add(new Date().toInstant());
array.add(processorName);
array.add(aggregateRootId);
array.add(publishedVersion - 1);
}
sqlClient.updateWithParams(sql, array, x -> {
if (x.succeeded()) {
future.complete(x.result().getUpdated());
return;
}
future.completeExceptionally(x.cause());
});
return future.exceptionally(throwable -> {
if (throwable instanceof SQLException) {
SQLException ex = (SQLException) throwable;
// insert duplicate return
if (isInsert && ex.getSQLState().equals(sqlState) && ex.getMessage().contains(uniqueIndexName)) {
return 0;
}
logger.error("Insert or update aggregate published version has sql exception.", ex);
throw new IORuntimeException(throwable);
}
logger.error("Insert or update aggregate published version has unknown exception.", throwable);
throw new EnodeRuntimeException(throwable);
});
}
/**
* It unloads the bundle by closing all topics concurrently under this bundle.
*
* <pre>
* a. disable bundle ownership in memory and not in zk
* b. close all the topics concurrently
* c. delete ownership znode from zookeeper.
* </pre>
*
* @param pulsar
* @param timeout
* timeout for unloading bundle. It doesn't throw exception if it times out while waiting on closing all
* topics
* @param timeoutUnit
* @throws Exception
*/
public CompletableFuture<Void> handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit timeoutUnit) {
long unloadBundleStartTime = System.nanoTime();
// Need a per namespace RenetrantReadWriteLock
// Here to do a writeLock to set the flag and proceed to check and close connections
try {
while (!this.nsLock.writeLock().tryLock(1, TimeUnit.SECONDS)) {
// Using tryLock to avoid deadlocks caused by 2 threads trying to acquire 2 readlocks (eg: replicators)
// while a handleUnloadRequest happens in the middle
LOG.warn("Contention on OwnedBundle rw lock. Retrying to acquire lock write lock");
}
try {
// set the flag locally s.t. no more producer/consumer to this namespace is allowed
if (!IS_ACTIVE_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// An exception is thrown when the namespace is not in active state (i.e. another thread is
// removing/have removed it)
return FutureUtil.failedFuture(new IllegalStateException(
"Namespace is not active. ns:" + this.bundle + "; state:" + IS_ACTIVE_UPDATER.get(this)));
}
} finally {
// no matter success or not, unlock
this.nsLock.writeLock().unlock();
}
} catch (InterruptedException e) {
return FutureUtil.failedFuture(e);
}
AtomicInteger unloadedTopics = new AtomicInteger();
LOG.info("Disabling ownership: {}", this.bundle);
// close topics forcefully
CompletableFuture<Void> future = pulsar.getNamespaceService().getOwnershipCache()
.updateBundleState(this.bundle, false)
.thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(bundle, true, timeout, timeoutUnit))
.handle((numUnloadedTopics, ex) -> {
if (ex != null) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics under namespace {}", bundle.toString(), ex);
} else {
unloadedTopics.set(numUnloadedTopics);
}
return null;
})
.thenCompose(v -> {
// delete ownership node on zk
return pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
}).thenAccept(v -> {
double unloadBundleTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - unloadBundleStartTime));
LOG.info("Unloading {} namespace-bundle with {} topics completed in {} ms", this.bundle,
unloadedTopics, unloadBundleTime);
});
future.exceptionally(ex -> {
// Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true);
return null;
});
return future;
}