下面列出了java.util.concurrent.CompletableFuture#isDone ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void respondUntilDone(
final List<RespondingEthPeer> peers,
final RespondingEthPeer.Responder responder,
final CompletableFuture<?> result) {
if (peers.size() == 1) {
// Use a blocking approach to waiting for the next message when we can.
peers.get(0).respondWhileOtherThreadsWork(responder, () -> !result.isDone());
return;
}
while (!result.isDone()) {
for (final RespondingEthPeer peer : peers) {
peer.respond(responder);
}
giveOtherThreadsAGo();
}
}
@Override
public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage requestMessage) throws Exception {
final List<ResponseMessage> results = new ArrayList<>();
final CompletableFuture<List<ResponseMessage>> f = new CompletableFuture<>();
callbackResponseHandler.callback = response -> {
if (f.isDone())
throw new RuntimeException("A terminating message was already encountered - no more messages should have been received");
results.add(response);
// check if the current message is terminating - if it is then we can mark complete
if (!response.getStatus().getCode().equals(ResponseStatusCode.PARTIAL_CONTENT)) {
f.complete(results);
}
};
writeAndFlush(requestMessage);
return f;
}
private List<CompletableFuture<DLSN>> asyncWriteBulk(List<LogRecord> records) {
final ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(records.size());
Iterator<LogRecord> iterator = records.iterator();
while (iterator.hasNext()) {
LogRecord record = iterator.next();
CompletableFuture<DLSN> future = asyncWrite(record, !iterator.hasNext());
results.add(future);
// Abort early if an individual write has already failed.
if (future.isDone() && future.isCompletedExceptionally()) {
break;
}
}
if (records.size() > results.size()) {
appendCancelledFutures(results, records.size() - results.size());
}
return results;
}
public void notifyResponse(Response response) {
if (response == null) {
return;
}
RequestWithFuture requestWithFuture = futureMap.remove(response.getRequestId());
if (requestWithFuture == null) {
return;
}
CompletableFuture<Response> future = requestWithFuture.getFuture();
if (!future.isDone()) {
future.complete(response);
}
}
/**
* Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
* If the promise has been satisfied before raising, it won't change the state of the promise.
*
* @param promise promise to raise exception
* @param timeout timeout period
* @param unit timeout period unit
* @param cause cause to raise
* @param scheduler scheduler to execute raising exception
* @param key the submit key used by the scheduler
* @return the promise applied with the raise logic
*/
public static <T> CompletableFuture<T> within(final CompletableFuture<T> promise,
final long timeout,
final TimeUnit unit,
final Throwable cause,
final OrderedScheduler scheduler,
final Object key) {
if (timeout < 0 || promise.isDone()) {
return promise;
}
// schedule a timeout to raise timeout exception
final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
@Override
public void run() {
if (!promise.isDone() && promise.completeExceptionally(cause)) {
log.info("Raise exception", cause);
}
}
}, timeout, unit);
// when the promise is satisfied, cancel the timeout task
promise.whenComplete((value, throwable) -> {
if (!task.cancel(true)) {
log.debug("Failed to cancel the timeout task");
}
}
);
return promise;
}
@Override
protected boolean matchesSafely(final CompletionStage<? extends T> stage,
final Description mismatchDescription) {
final CompletableFuture<? extends T> future = stage.toCompletableFuture();
if (future.isDone()) {
if (future.isCancelled()) {
mismatchDescription.appendText("a stage that was cancelled");
return false;
} else if (future.isCompletedExceptionally()) {
try {
future.getNow(null);
throw new AssertionError(
"This should never happen because the future has completed exceptionally.");
} catch (CompletionException e) {
mismatchDescription
.appendText("a stage that completed exceptionally with ")
.appendText(getStackTraceAsString(e.getCause()));
}
return false;
} else {
final T item = future.getNow(null);
if (matcher.matches(item)) {
return true;
} else {
mismatchDescription.appendText("a stage that completed to a value that ");
matcher.describeMismatch(item, mismatchDescription);
return false;
}
}
} else {
mismatchDescription.appendText("a stage that was not done");
return false;
}
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
default void put(K key, CompletableFuture<V> valueFuture) {
if (valueFuture.isCompletedExceptionally()
|| (valueFuture.isDone() && (valueFuture.join() == null))) {
cache().statsCounter().recordLoadFailure(0L);
cache().remove(key);
return;
}
long startTime = cache().statsTicker().read();
cache().put(key, valueFuture);
handleCompletion(key, valueFuture, startTime, /* recordMiss */ false);
}
public CompletableFuture<AggregateEventAppendResult> insertOneByOneAsync(List<Document> documents) {
CompletableFuture<AggregateEventAppendResult> future = new CompletableFuture<>();
CountDownLatch latch = new CountDownLatch(documents.size());
for (Document document : documents) {
mongoClient.getDatabase(mongoConfiguration.getDatabaseName()).getCollection(mongoConfiguration.getEventCollectionName())
.insertOne(document).subscribe(new Subscriber<InsertOneResult>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(InsertOneResult insertOneResult) {
latch.countDown();
}
@Override
public void onError(Throwable t) {
latch.countDown();
future.completeExceptionally(t);
}
@Override
public void onComplete() {
if (latch.getCount() == 0) {
AggregateEventAppendResult appendResult = new AggregateEventAppendResult();
appendResult.setEventAppendStatus(EventAppendStatus.Success);
future.complete(appendResult);
}
}
});
if (future.isDone()) {
break;
}
}
return future;
}
@Override
public boolean isDone() {
for (final CompletableFuture<V> f : this.futures) {
if (!f.isDone()) {
return false;
}
}
return true;
}
public UnionInputGate(InputGate... inputGates) {
this.inputGates = checkNotNull(inputGates);
checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGates.length);
this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length);
int currentNumberOfInputChannels = 0;
synchronized (inputGatesWithData) {
for (InputGate inputGate : inputGates) {
if (inputGate instanceof UnionInputGate) {
// if we want to add support for this, we need to implement pollNext()
throw new UnsupportedOperationException("Cannot union a union of input gates.");
}
// The offset to use for buffer or event instances received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels);
inputGatesWithRemainingData.add(inputGate);
currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
CompletableFuture<?> available = inputGate.isAvailable();
if (available.isDone()) {
inputGatesWithData.add(inputGate);
} else {
available.thenRun(() -> queueInputGate(inputGate));
}
}
if (!inputGatesWithData.isEmpty()) {
isAvailable = AVAILABLE;
}
}
this.totalNumberOfInputChannels = currentNumberOfInputChannels;
}
public static boolean isSuccessFuture(CompletableFuture future) {
return future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled();
}
protected void unpause() {
final CompletableFuture<Void> current = pause.get();
if (!current.isDone())
current.complete(null);
}
/**
*
* @throws NamespaceException
*/
public void start() throws NamespaceException {
// Since this is run inside the system startup, no one should be able to interact with it until we've already
// started everything. Thus no locking is necessary.
ImmutableMap.Builder<String, CompletableFuture<SourceState>> futuresBuilder = ImmutableMap.builder();
for (SourceConfig source : datasetListing.getSources(SystemUser.SYSTEM_USERNAME)) {
ManagedStoragePlugin plugin = newPlugin(source);
futuresBuilder.put(source.getName(), plugin.startAsync());
plugins.put(c(source.getName()), plugin);
}
Map<String, CompletableFuture<SourceState>> futures = futuresBuilder.build();
final CompletableFuture<Void> futureWait = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
try {
// wait STARTUP_WAIT_MILLIS or until all plugins have started/failed to start.
futureWait.get(startupWait, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// ignore since we're going to evaluate individually below.
}
final StringBuilder sb = new StringBuilder();
int count = 0;
sb.append("Result of storage plugin startup: \n");
for(final ManagedStoragePlugin p : plugins.values()) {
count++;
String name = p.getName().getRoot();
final CompletableFuture<SourceState> future = futures.get(name);
Preconditions.checkNotNull(future, "Unexpected failure to retrieve source %s from available futures %s.", name, futures.keySet());
if(future.isDone()) {
try {
SourceState state = future.get();
String result = state.getStatus() == SourceStatus.bad ? "started in bad state" : "success";
sb.append(String.format("\t%s: %s (%dms). %s\n", name, result, p.getStartupTime(), state));
}catch (Exception ex) {
logger.error("Failure while starting plugin {} after {}ms.", p.getName(), p.getStartupTime(), ex);
sb.append(String.format("\t%s: failed (%dms). %s\n", name, p.getStartupTime(), p.getState()));
p.initiateFixFailedStartTask();
}
} else {
// not finished, let's get a log entry later.
future.thenRun(Runnables.combo(new LateSourceRunnable(future, p)));
sb.append(String.format("\t%s: pending.\n", name));
}
}
// for coordinator, ensure catalog synchronization. Don't start this until the plugins manager is started.
if(context.getRoles().contains(Role.COORDINATOR)) {
refresher = scheduler.schedule(Schedule.Builder.everyMillis(CatalogServiceImpl.CATALOG_SYNC).build(), Runnables.combo(new Refresher()));
}
if(count > 0) {
logger.info(sb.toString());
}
}
@Override
public CompletableFuture<Boolean> next(ReadTransaction tr) {
final boolean newIterator = asyncIterator == null;
if (newIterator) {
level--;
if (level < 0) {
// Finest level: rank is accurate.
return READY_FALSE;
}
levelSubspace = subspace.get(level);
asyncIterator = lookupIterator(tr.getRange(
KeySelector.firstGreaterOrEqual(levelSubspace.pack(rankKey)),
KeySelector.firstGreaterThan(levelSubspace.pack(key)),
ReadTransaction.ROW_LIMIT_UNLIMITED,
false,
StreamingMode.WANT_ALL));
lastCount = 0;
}
final long startTime = System.nanoTime();
final CompletableFuture<Boolean> onHasNext = asyncIterator.onHasNext();
final boolean wasDone = onHasNext.isDone();
return onHasNext.thenApply(hasNext -> {
if (!wasDone) {
nextLookupKey(System.nanoTime() - startTime, newIterator, hasNext, level, true);
}
if (!hasNext) {
// Totalled this level: move to next.
asyncIterator = null;
rank -= lastCount;
if (Arrays.equals(rankKey, key)) {
// Exact match on this level: no need for finer.
return false;
}
if (!keyShouldBePresent && level == 0 && lastCount > 0) {
// If the key need not be present and we are on the finest level, then if it wasn't an exact
// match, key would have the next rank after the last one. Except in the case where key is less
// than the lowest key in the set, in which case it takes rank 0. This is recognizable because
// at level 0, only the leftmost empty array has a count of zero; every other key has a count of one
// (or the number of duplicates if those are counted separately).
rank++;
}
return true;
}
KeyValue kv = asyncIterator.next();
rankKey = levelSubspace.unpack(kv.getKey()).getBytes(0);
lastCount = decodeLong(kv.getValue());
rank += lastCount;
return true;
});
}
private void checkRandom(CompletableFuture<Location> future) {
if (checks >= maxTries) {
future.completeExceptionally(new NotFoundException("location"));
return;
}
if (future.isCancelled() || future.isDone() || future.isCompletedExceptionally()) {
return;
}
lastCheck = center.getWorld().getTime();
Location randomLoc = center.clone();
randomLoc.setY(0);
int minChunk = minRadius >> 4;
int maxChunk = maxRadius >> 4;
int randChunkX;
int randChunkZ;
Chunk[] loadedChunks = new Chunk[0];
if (loadedOnly) {
loadedChunks = randomLoc.getWorld().getLoadedChunks();
if (loadedChunks.length == 0) {
future.completeExceptionally(new NotFoundException("loaded chunk"));
return;
}
}
do {
checks++;
if (checks >= maxTries) {
future.completeExceptionally(new NotFoundException("location"));
return;
}
if (loadedOnly) {
Chunk chunk = loadedChunks[random.nextInt(loadedChunks.length)];
randChunkX = chunk.getX();
randChunkZ = chunk.getZ();
} else {
randChunkX = (random.nextBoolean() ? 1 : -1) * random.nextInt(maxChunk + 1);
randChunkZ = (random.nextBoolean() ? 1 : -1) * random.nextInt(maxChunk + 1);
}
} while (!checked.put(randChunkX, randChunkZ) || !inRadius(Math.abs(randChunkX), Math.abs(randChunkZ), minChunk, maxChunk));
randomLoc.setX(((center.getBlockX() >> 4) + randChunkX) * 16);
randomLoc.setZ(((center.getBlockZ() >> 4) + randChunkZ) * 16);
PaperLib.getChunkAtAsync(randomLoc, generatedOnly).thenApply(c -> {
checks++;
if (c == null) {
// Chunk not generated, test another one
checkRandom(future);
return false;
}
int indexOffset = random.nextInt(RANDOM_LIST.size());
Location foundLoc = null;
for (int i = 0; i < RANDOM_LIST.size(); i++) {
int index = (i + indexOffset) % RANDOM_LIST.size();
boolean validated = true;
Location loc = randomLoc.clone().add(RANDOM_LIST.get(index)[0], 0, RANDOM_LIST.get(index)[1]);
if (!inRadius(loc)) {
continue;
}
for (LocationValidator validator : getValidators().getAll()) {
if (!validator.validate(this, loc)) {
validated = false;
break;
}
}
if (validated) {
foundLoc = loc;
break;
}
}
if (foundLoc != null) {
// all checks are for the top block, put we want a location above that so add 1 to y
future.complete(foundLoc.add(0, 1, 0));
return true;
}
long diff = center.getWorld().getTime() - lastCheck;
if (diff < checkDelay) {
plugin.getServer().getScheduler().runTaskLater(plugin, () -> checkRandom(future), checkDelay - diff);
} else {
checkRandom(future);
}
return false;
}).exceptionally(future::completeExceptionally);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.setProperty("java.util.logging.SimpleFormatter.format",
"[%1$tT] [%4$-7s] %5$s %n");
logger.info("Printing customer order ...");
CustomerAsyncs.printOrder();
logger.info("Fetch order summary ...");
CustomerAsyncs.fetchOrderSummary();
logger.info("Fetch order summary (Executor) ...");
CustomerAsyncs.fetchOrderSummaryExecutor();
logger.info("Fetch order, compute total and sign ...");
CustomerAsyncs.fetchInvoiceTotalSign();
logger.info("Fetch and print order ...");
CustomerAsyncs.fetchAndPrintOrder();
logger.info("Deliver order and notify customer ...");
CustomerAsyncs.deliverOrderNotifyCustomer();
logger.info("exceptionally() ...");
CustomerAsyncs.fetchOrderTotalException();
logger.info("Chain exceptionally() ...");
CustomerAsyncs.fetchInvoiceTotalSignChainOfException();
logger.info("Global exceptionally() ...");
CustomerAsyncs.fetchInvoiceTotalSignGlobalException();
logger.info("exceptionallyCompose() ...");
CustomerAsyncs.printInvoiceException();
logger.info("exceptionallyAsync() ...");
CustomerAsyncs.fetchOrderTotalExceptionAsync();
logger.info("exceptionallyHandle() ...");
CustomerAsyncs.fetchOrderTotalHandle();
logger.info("Computing taxes ...");
CompletableFuture<Integer> cfTaxes = CustomerAsyncs.taxes();
while (!cfTaxes.isDone()) {
logger.info("Still computing ...");
}
int result = cfTaxes.get();
logger.info(() -> "Result: " + result);
}
public static void cancel(CompletableFuture<?> promise) {
// cancel future if needed
if (promise != null && !promise.isDone()) {
promise.cancel(true);
}
}
/**
* This function takes a {@link CompletableFuture} and a handler function for the result of this future. If the
* input future is already done, this function returns {@link CompletableFuture#handle(BiFunction)}. Otherwise,
* the return value is {@link CompletableFuture#handleAsync(BiFunction, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to call #handle.
* @param executor the executor to run the handle function if the future is not yet done.
* @param handler the handler function to call when the future is completed.
* @param <IN> type of the handler input argument.
* @param <OUT> type of the handler return value.
* @return the new completion stage.
*/
public static <IN, OUT> CompletableFuture<OUT> handleAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
BiFunction<? super IN, Throwable, ? extends OUT> handler) {
return completableFuture.isDone() ?
completableFuture.handle(handler) :
completableFuture.handleAsync(handler, executor);
}
/**
* This function takes a {@link CompletableFuture} and a function to compose with this future. If the input future
* is already done, this function returns {@link CompletableFuture#thenCompose(Function)}. Otherwise, the return
* value is {@link CompletableFuture#thenComposeAsync(Function, Executor)} with the given executor.
*
* @param completableFuture the completable future for which we want to compose.
* @param executor the executor to run the compose function if the future is not yet done.
* @param composeFun the function to compose.
* @param <IN> type of the input future.
* @param <OUT> type of the output future.
* @return a completable future that is a composition of the input future and the function.
*/
public static <IN, OUT> CompletableFuture<OUT> thenComposeAsyncIfNotDone(
CompletableFuture<IN> completableFuture,
Executor executor,
Function<? super IN, ? extends CompletionStage<OUT>> composeFun) {
return completableFuture.isDone() ?
completableFuture.thenCompose(composeFun) :
completableFuture.thenComposeAsync(composeFun, executor);
}
/**
* Add timing instrumentation to an asynchronous operation.
*
* @param events the event types to use to record timing
* @param future a future that will complete when the operation is finished
* @param executor an asynchronous executor to use to run the recording
* @param startTime the nanosecond time at which the operation started
* @param <T> the type of the future
*
* @return a new future that will be complete after also recording timing information
*/
public <T> CompletableFuture<T> instrument(Set<Event> events, CompletableFuture<T> future, Executor executor, long startTime) {
if (future.isDone()) {
long timeDifference = System.nanoTime() - startTime;
for (Event event : events) {
record(event, timeDifference);
}
return future;
}
return instrumentAsync(events, future, executor, startTime);
}