下面列出了java.util.concurrent.CompletableFuture#completeExceptionally ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CompletableFuture<Integer> SubscribeToRegisterThingAccepted(
RegisterThingSubscriptionRequest request,
QualityOfService qos,
Consumer<RegisterThingResponse> handler,
Consumer<Exception> exceptionHandler) {
String topic = "$aws/provisioning-templates/{templateName}/provision/json/accepted";
if (request.templateName == null) {
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
result.completeExceptionally(new MqttException("RegisterThingSubscriptionRequest must have a non-null templateName"));
return result;
}
topic = topic.replace("{templateName}", request.templateName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
RegisterThingResponse response = gson.fromJson(payload, RegisterThingResponse.class);
handler.accept(response);
} catch (Exception e) {
if (exceptionHandler != null) {
exceptionHandler.accept(e);
}
}
};
return connection.subscribe(topic, qos, messageHandler);
}
/**
* Used to get table regions' info and server.
* @param metaTable
* @param tableName table we're looking for, can be null for getting all regions
* @param excludeOfflinedSplitParents don't return split parents
* @return the list of regioninfos and server. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
final boolean excludeOfflinedSplitParents) {
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
if (TableName.META_TABLE_NAME.equals(tableName)) {
future.completeExceptionally(new IOException(
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
}
// Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
CollectRegionLocationsVisitor visitor =
new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(visitor.getResults());
});
return future;
}
@Override
public CompletableFuture<TimeResponse> time() {
// The TimeRequest message is empty one so just form it
final TimeRequest requestMsg = TimeRequest.getDefaultInstance();
final CompletableFuture<TimeResponse> future = new CompletableFuture<>();
final StreamObserver<TimeResponse> observer =
new StreamObserver<TimeResponse>() {
@Override
public void onNext(TimeResponse value) {
future.complete(value);
}
@Override
public void onError(Throwable t) {
handleRpcError(t, "gNOI time request");
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
// ignore
}
};
execRpc(s -> s.time(requestMsg, observer));
return future;
}
@Override
public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
// Init steps
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
// Store the future related to the operation
futures.put(key, future);
// Create the meter data
MeterData data = new MeterData(meter, null, local);
// Update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
// If it does not exist in the system
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
// Complete immediately
future.complete(MeterStoreResult.success());
}
} catch (StorageException e) {
futures.remove(key);
future.completeExceptionally(e);
}
// Done, return the future
return future;
}
@Override
public CompletionStage<NodeConnectionReport> closeConnectionAsync(
SocketAddress connection, CloseType type) {
Optional<Channel> channel =
this.clientChannelGroup
.stream()
.filter(c -> c.remoteAddress().equals(connection))
.findFirst();
if (channel.isPresent()) {
ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelGroup.add(channel.get());
ClusterConnectionReport clusterReport = new ClusterConnectionReport(getCluster().getId());
NodeConnectionReport report =
clusterReport.addNode(this, Collections.singletonList(connection), getAddress());
return closeChannelGroup(channelGroup, type).thenApply(f -> report);
} else {
CompletableFuture<NodeConnectionReport> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new IllegalArgumentException("Not found"));
return failedFuture;
}
}
@Retry(name = RetryDummyService.RETRY_BACKEND_B)
@Override
public CompletionStage<String> doSomethingAsync(boolean throwException) throws IOException {
if (throwException) {
CompletableFuture<String> promise = new CompletableFuture<>();
promise.completeExceptionally(new IOException("Test Message"));
return promise;
} else {
return CompletableFuture.supplyAsync(() -> "test");
}
}
/**
* Unregisters a cluster and closes all listening network interfaces associated with it.
*
* <p>If the cluster is not currently registered the returned future will fail with an {@link
* IllegalArgumentException}.
*
* @param clusterId id of the cluster.
* @return A future that when completed provides the unregistered cluster as it existed in the
* registry, may not be the same object as the input.
*/
public CompletionStage<BoundCluster> unregisterAsync(Long clusterId) {
if (isClosed()) {
return failByClose();
}
CompletableFuture<BoundCluster> future = new CompletableFuture<>();
if (clusterId == null) {
future.completeExceptionally(new IllegalArgumentException("Null id provided"));
} else {
BoundCluster foundCluster = clusters.remove(clusterId);
List<CompletableFuture<BoundNode>> closeFutures = new ArrayList<>();
if (foundCluster != null) {
// Close socket on each node.
for (BoundDataCenter dataCenter : foundCluster.getDataCenters()) {
for (BoundNode node : dataCenter.getNodes()) {
closeFutures.add(close(node));
}
}
CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[] {}))
.whenComplete(
(__, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(foundCluster);
}
});
} else {
future.completeExceptionally(new IllegalArgumentException("ClusterSpec not found."));
}
}
return future;
}
@Override
CompletableFuture<Void> whenReceivingResponse() {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
client.registerEvent(evt);
} catch (IOException e) {
cf.completeExceptionally(e);
}
return cf;
}
/**
* Deserializes an object to a type as a future to ease CompletableFuture
* chaining.
*
* @param obj The object to deserialize.
* @param classType Class information to convert to.
* @param <R> The return Type.
* @return A CompletableFuture containing the value or exception for an error.
*/
public static <R> CompletableFuture<R> futureGetAs(Object obj, Class<R> classType) {
CompletableFuture<R> futureResult = new CompletableFuture<>();
try {
futureResult.complete(Serialization.safeGetAs(obj, classType));
} catch (JsonProcessingException jpe) {
futureResult
.completeExceptionally(new CompletionException("Unable to deserialize", jpe));
}
return futureResult;
}
@Override
public CompletableFuture<List<User>> getUsersWhoReactedWithEmoji(String channelId, String messageId, Emoji emoji) {
try {
return getUsersWhoReactedWithEmoji(Long.parseLong(channelId), Long.parseLong(messageId), emoji);
} catch (NumberFormatException e) {
CompletableFuture<List<User>> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
/**
* 打开
*
* @param future
*/
protected void doOpen(final CompletableFuture<Void> future) {
if (!STATE_UPDATER.compareAndSet(this, Status.EXPORTED, Status.OPENING)) {
logger.info(String.format("Failed opening provider %s. caused by state is illegal.", name()));
future.completeExceptionally(new InitializationException("state is illegal."));
} else {
logger.info(String.format("Start opening provider %s.", name()));
controller.open().whenComplete((v, t) -> {
if (openFuture != future || t == null && !STATE_UPDATER.compareAndSet(this, Status.OPENING, Status.OPENED)) {
logger.info(String.format("Failed exporting provider %s. caused by state is illegal", name()));
future.completeExceptionally(new InitializationException("Status is illegal."));
controller.close();
} else if (t != null) {
//会自动关闭
logger.info(String.format("Failed exporting provider %s. caused by %s", name(), t.getMessage()));
//状态回滚
STATE_UPDATER.compareAndSet(this, Status.OPENING, Status.EXPORTED);
future.completeExceptionally(t);
} else {
logger.info(String.format("Success opening provider %s.", name()));
future.complete(null);
//触发配置更新
controller.update();
}
});
}
}
@Test
public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
// create a topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.NOT_RUN);
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<Long>();
Compactor compactor = pulsar.getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.RUNNING);
promise.complete(1L);
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
doReturn(errorPromise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
errorPromise.completeExceptionally(new Exception("Failed at something"));
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.ERROR);
assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something"));
}
@Test
public void errorInCompletableLoggingTest() {
// Do
final CompletableFuture<List<TextMessage>> returnValue = new CompletableFuture<>();
target.accept(returnValue);
returnValue.completeExceptionally(new GeneralLineMessagingException("EXCEPTION HAPPEN!", null, null));
// Verify
assertThat(systemOut.getLogWithNormalizedLineSeparator())
.contains("EXCEPTION HAPPEN!");
}
@CheckNoWriter
@Test(dataProvider = "caches")
@CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING })
public void put_insert_failure_before(
AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> failedFuture = CompletableFuture.completedFuture(null);
failedFuture.completeExceptionally(new IllegalStateException());
cache.put(context.absentKey(), failedFuture);
assertThat(cache.getIfPresent(context.absentKey()), is(nullValue()));
assertThat(cache.synchronous().estimatedSize(), is(context.initialSize()));
}
@Test
public void testFinishedCheckedExceptionStageToListenable() throws ExecutionException, InterruptedException {
Exception inputException = new Exception("something went wrong");
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(inputException);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(is(inputException));
FutureConverter.toListenableFuture(future)
.get();
}
/**
* Given a Supplier returning a Future, completes another future either with the result of the first future, in case
* of normal completion, or exceptionally with the exception of the first future.
*
* @param futureSupplier A Supplier returning a Future to listen to.
* @param toComplete A CompletableFuture that has not yet been completed, which will be completed with the result
* of the Future from futureSupplier.
* @param <T> Return type of Future.
*/
public static <T> void completeAfter(Supplier<CompletableFuture<? extends T>> futureSupplier, CompletableFuture<T> toComplete) {
Preconditions.checkArgument(!toComplete.isDone(), "toComplete is already completed.");
try {
CompletableFuture<? extends T> f = futureSupplier.get();
// Async termination.
f.thenAccept(toComplete::complete);
Futures.exceptionListener(f, toComplete::completeExceptionally);
} catch (Throwable ex) {
// Synchronous termination.
toComplete.completeExceptionally(ex);
throw ex;
}
}
@Override
public CompletableFuture<Void> createSourceAsync(SourceConfig sourceConfig, String fileName) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
RequestBuilder builder =
post(source.path(sourceConfig.getTenant())
.path(sourceConfig.getNamespace()).path(sourceConfig.getName()).getUri().toASCIIString())
.addBodyPart(new StringPart("sourceConfig", ObjectMapperFactory.getThreadLocal()
.writeValueAsString(sourceConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
.status(response.getStatusCode())
.entity(response.getResponseBody()).build()));
} else {
future.complete(null);
}
});
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
return future;
}
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new ServerMetadataException(e));
return future;
}
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = policies.replication_clusters;
} else {
configuredClusters = Collections.emptySet();
}
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
}
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
}
@Override
public <T> CompletableFuture<T> ask(Object message, Class<T> responseType, Boolean persistOnResponse) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException(getExceptionMessage()));
return future;
}
private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}