下面列出了java.util.concurrent.CompletableFuture#isCompletedExceptionally ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
long requestId = getLastMessageId.getRequestId();
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) position,
partitionIndex,
requestId,
consumer.getSubscription().getName());
} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}
@Override
protected void handleFlow(CommandFlow flow) {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] Received flow from consumer {} permits: {}", remoteAddress, flow.getConsumerId(),
flow.getMessagePermits());
}
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
consumer.flowPermits(flow.getMessagePermits());
} else {
log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
}
}
}
@Override
protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] Received Resend Command from consumer {} ", remoteAddress, redeliver.getConsumerId());
}
CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
}
}
}
private void addTxnToTimeoutService(String scope, String stream, long lease, long maxExecutionPeriod, UUID txnId,
CompletableFuture<VersionedTransactionData> txnFuture) {
Version version = null;
long executionExpiryTime = System.currentTimeMillis() + maxExecutionPeriod;
if (!txnFuture.isCompletedExceptionally()) {
version = txnFuture.join().getVersion();
executionExpiryTime = txnFuture.join().getMaxExecutionExpiryTime();
}
timeoutService.addTxn(scope, stream, txnId, version, lease, executionExpiryTime);
log.trace("Txn={}, added to timeout service on host={}", txnId, hostId);
}
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
if (log.isDebugEnabled()) {
log.debug("Received Broker lookup response: {}", lookupResult.getResponse());
}
long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);
if (requestFuture != null) {
if (requestFuture.isCompletedExceptionally()) {
if (log.isDebugEnabled()) {
log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId());
}
return;
}
// Complete future with exception if : Result.response=fail/null
if (!lookupResult.hasResponse()
|| CommandLookupTopicResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
if (lookupResult.hasError()) {
checkServerError(lookupResult.getError(), lookupResult.getMessage());
requestFuture.completeExceptionally(
getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
} else {
requestFuture
.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
}
} else {
requestFuture.complete(new LookupDataResult(lookupResult));
}
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), lookupResult.getRequestId());
}
}
@Override
protected void handleAck(CommandAck ack) {
checkArgument(state == State.Connected);
CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).messageAcked(ack);
}
}
/**
* Calculates the preferred locations based on the location preference constraint.
*
* @param locationPreferenceConstraint constraint for the location preference
* @return Future containing the collection of preferred locations. This might not be completed if not all inputs
* have been a resource assigned.
*/
@VisibleForTesting
public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocations();
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
switch(locationPreferenceConstraint) {
case ALL:
preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
break;
case ANY:
final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());
for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);
if (taskManagerLocation == null) {
throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
}
completedTaskManagerLocations.add(taskManagerLocation);
}
}
preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
break;
default:
throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
}
return preferredLocationsFuture;
}
/** propagate an exception for the dependency to the current task by throwing it */
protected void propagateException(TaskType task) throws InterruptedException, ExecutionException {
CompletableFuture<?> depResult = task.getFuture();
if (!depResult.isCompletedExceptionally()) {
return;
}
depResult.get();
Verify.verify(false, "Should have completed exceptionally");
}
@SuppressWarnings("unchecked")
public <T> T getDataIfPresent(String path) {
CompletableFuture<Map.Entry<Object, Stat>> f = dataCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
return (T) f.join().getKey();
} else {
return null;
}
}
/**
* Returns the latest value of {@code watchFile()} result.
*
* @param defaultValue the default value which is returned when the value is not available yet
*/
@Nullable
default T latestValue(@Nullable T defaultValue) {
final CompletableFuture<Latest<T>> initialValueFuture = initialValueFuture();
if (initialValueFuture.isDone() && !initialValueFuture.isCompletedExceptionally()) {
return latest().value();
} else {
return defaultValue;
}
}
/** Returns the function that indicates whether a failure occurred or not. */
private static <T> Function<T, CompletionStage<Void>> checkFailure(
CompletableFuture<Void> failure) {
return ignored -> {
if (failure.isCompletedExceptionally()) {
return failure;
}
return CompletableFuture.completedFuture(null);
};
}
private void checkRandom(CompletableFuture<Location> future) {
if (checks >= maxTries) {
future.completeExceptionally(new NotFoundException("location"));
return;
}
if (future.isCancelled() || future.isDone() || future.isCompletedExceptionally()) {
return;
}
lastCheck = center.getWorld().getTime();
Location randomLoc = center.clone();
randomLoc.setY(0);
int minChunk = minRadius >> 4;
int maxChunk = maxRadius >> 4;
int randChunkX;
int randChunkZ;
Chunk[] loadedChunks = new Chunk[0];
if (loadedOnly) {
loadedChunks = randomLoc.getWorld().getLoadedChunks();
if (loadedChunks.length == 0) {
future.completeExceptionally(new NotFoundException("loaded chunk"));
return;
}
}
do {
checks++;
if (checks >= maxTries) {
future.completeExceptionally(new NotFoundException("location"));
return;
}
if (loadedOnly) {
Chunk chunk = loadedChunks[random.nextInt(loadedChunks.length)];
randChunkX = chunk.getX();
randChunkZ = chunk.getZ();
} else {
randChunkX = (random.nextBoolean() ? 1 : -1) * random.nextInt(maxChunk + 1);
randChunkZ = (random.nextBoolean() ? 1 : -1) * random.nextInt(maxChunk + 1);
}
} while (!checked.put(randChunkX, randChunkZ) || !inRadius(Math.abs(randChunkX), Math.abs(randChunkZ), minChunk, maxChunk));
randomLoc.setX(((center.getBlockX() >> 4) + randChunkX) * 16);
randomLoc.setZ(((center.getBlockZ() >> 4) + randChunkZ) * 16);
PaperLib.getChunkAtAsync(randomLoc, generatedOnly).thenApply(c -> {
checks++;
if (c == null) {
// Chunk not generated, test another one
checkRandom(future);
return false;
}
int indexOffset = random.nextInt(RANDOM_LIST.size());
Location foundLoc = null;
for (int i = 0; i < RANDOM_LIST.size(); i++) {
int index = (i + indexOffset) % RANDOM_LIST.size();
boolean validated = true;
Location loc = randomLoc.clone().add(RANDOM_LIST.get(index)[0], 0, RANDOM_LIST.get(index)[1]);
if (!inRadius(loc)) {
continue;
}
for (LocationValidator validator : getValidators().getAll()) {
if (!validator.validate(this, loc)) {
validated = false;
break;
}
}
if (validated) {
foundLoc = loc;
break;
}
}
if (foundLoc != null) {
// all checks are for the top block, put we want a location above that so add 1 to y
future.complete(foundLoc.add(0, 1, 0));
return true;
}
long diff = center.getWorld().getTime() - lastCheck;
if (diff < checkDelay) {
plugin.getServer().getScheduler().runTaskLater(plugin, () -> checkRandom(future), checkDelay - diff);
} else {
checkRandom(future);
}
return false;
}).exceptionally(future::completeExceptionally);
}
@Override
public CompletableFuture<T> getFuture() {
final CompletableFuture<T> currentGatewayFuture = atomicGatewayFuture.get();
if (currentGatewayFuture.isCompletedExceptionally()) {
try {
currentGatewayFuture.get();
} catch (ExecutionException | InterruptedException executionException) {
String leaderAddress;
try {
Tuple2<String, UUID> leaderAddressSessionId = getLeaderNow()
.orElse(Tuple2.of("unknown address", HighAvailabilityServices.DEFAULT_LEADER_ID));
leaderAddress = leaderAddressSessionId.f0;
} catch (Exception e) {
log.warn("Could not obtain the current leader.", e);
leaderAddress = "unknown leader address";
}
if (log.isDebugEnabled() || log.isTraceEnabled()) {
// only log exceptions on debug or trace level
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress,
ExceptionUtils.stripExecutionException(executionException));
} else {
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress);
}
}
// we couldn't resolve the gateway --> let's try again
final CompletableFuture<T> newGatewayFuture = createGateway(getLeaderFuture());
// let's check if there was a concurrent createNewFuture call
if (atomicGatewayFuture.compareAndSet(currentGatewayFuture, newGatewayFuture)) {
return newGatewayFuture;
} else {
return atomicGatewayFuture.get();
}
} else {
return atomicGatewayFuture.get();
}
}
public static boolean isSuccessFuture(CompletableFuture future) {
return future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled();
}
@Override
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
checkArgument(state == State.Connected);
log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId());
long requestId = closeConsumer.getRequestId();
long consumerId = closeConsumer.getConsumerId();
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found"));
return;
}
if (!consumerFuture.isDone() && consumerFuture
.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
// We have received a request to close the consumer before it was actually completed, we have marked the
// consumer future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new consumer will be discarded.
log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}
if (consumerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId);
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}
// Proceed with normal consumer close
Consumer consumer = consumerFuture.getNow(null);
try {
consumer.close();
consumers.remove(consumerId, consumerFuture);
ctx.writeAndFlush(Commands.newSuccess(requestId));
log.info("[{}] Closed consumer {}", remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
ctx.writeAndFlush(
Commands.newError(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage()));
}
}
/** Returns if the future has successfully completed. */
static boolean isReady(@Nullable CompletableFuture<?> future) {
return (future != null) && future.isDone()
&& !future.isCompletedExceptionally()
&& (future.join() != null);
}
public static boolean succeeded(CompletableFuture future) {
return future.isDone() && !future.isCompletedExceptionally();
}
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
Timer timer = raftServerMetrics.getClientRequestTimer(request);
final Timer.Context timerContext = (timer != null) ? timer.time() : null;
CompletableFuture<RaftClientReply> replyFuture;
if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
} else {
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
}
// let the state machine handle read-only request from client
RaftClientRequest.Type type = request.getType();
if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
if (type.getStream().getEndOfRequest()) {
final CompletableFuture<RaftClientRequest> f = streamEndOfRequestAsync(request);
if (f.isCompletedExceptionally()) {
return f.thenApply(r -> null);
}
request = f.join();
type = request.getType();
}
}
if (type.is(RaftClientRequestProto.TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
replyFuture = processQueryFuture(stateMachine.query(request.getMessage()), request);
} else if (type.is(RaftClientRequestProto.TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(RaftClientRequestProto.TypeCase.STREAM)) {
replyFuture = streamAsync(request);
} else {
// query the retry cache
RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
request.getClientId(), request.getCallId());
if (previousResult.isRetry()) {
// if the previous attempt is still pending or it succeeded, return its
// future
replyFuture = previousResult.getEntry().getReplyFuture();
} else {
final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
TransactionContext context = stateMachine.startTransaction(request);
if (context.getException() != null) {
RaftClientReply exceptionReply = new RaftClientReply(request,
new StateMachineException(getMemberId(), context.getException()), getCommitInfos());
cacheEntry.failWithReply(exceptionReply);
replyFuture = CompletableFuture.completedFuture(exceptionReply);
} else {
replyFuture = appendTransaction(request, context, cacheEntry);
}
}
}
}
replyFuture.whenComplete((clientReply, exception) -> {
if (clientReply.isSuccess() && timerContext != null) {
timerContext.stop();
}
});
return replyFuture;
}
@Override
public CompletableFuture<T> getFuture() {
final CompletableFuture<T> currentGatewayFuture = atomicGatewayFuture.get();
if (currentGatewayFuture.isCompletedExceptionally()) {
try {
currentGatewayFuture.get();
} catch (ExecutionException | InterruptedException executionException) {
String leaderAddress;
try {
Tuple2<String, UUID> leaderAddressSessionId = getLeaderNow()
.orElse(Tuple2.of("unknown address", HighAvailabilityServices.DEFAULT_LEADER_ID));
leaderAddress = leaderAddressSessionId.f0;
} catch (Exception e) {
log.warn("Could not obtain the current leader.", e);
leaderAddress = "unknown leader address";
}
if (log.isDebugEnabled() || log.isTraceEnabled()) {
// only log exceptions on debug or trace level
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress,
ExceptionUtils.stripExecutionException(executionException));
} else {
log.warn(
"Error while retrieving the leader gateway. Retrying to connect to {}.",
leaderAddress);
}
}
// we couldn't resolve the gateway --> let's try again
final CompletableFuture<T> newGatewayFuture = createGateway(getLeaderFuture());
// let's check if there was a concurrent createNewFuture call
if (atomicGatewayFuture.compareAndSet(currentGatewayFuture, newGatewayFuture)) {
return newGatewayFuture;
} else {
return atomicGatewayFuture.get();
}
} else {
return atomicGatewayFuture.get();
}
}
/**
* Returns whether the given {@link CompletableFuture} has completed normally, i.e., not exceptionally.
* If the future is yet to complete or if the future completed with an error, then this
* will return <code>false</code>.
* @param future the future to check for normal completion
* @return whether the future has completed without exception
*/
@API(API.Status.MAINTAINED)
public static boolean isCompletedNormally(@Nonnull CompletableFuture<?> future) {
return future.isDone() && !future.isCompletedExceptionally();
}