下面列出了io.vertx.core.Promise#future ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Future<Void> bindInsecureServer() {
if (isInsecurePortEnabled()) {
final ProtonServerOptions options =
new ProtonServerOptions()
.setHost(getConfig().getInsecurePortBindAddress())
.setPort(determineInsecurePort())
.setMaxFrameSize(getConfig().getMaxFrameSize())
// set heart beat to half the idle timeout
.setHeartbeat(getConfig().getIdleTimeout() >> 1);
final Promise<Void> result = Promise.promise();
insecureServer = createServer(insecureServer, options);
insecureServer.connectHandler(this::onConnectRequest).listen(ar -> {
if (ar.succeeded()) {
log.info("insecure AMQP server listening on [{}:{}]", getConfig().getInsecurePortBindAddress(), getActualInsecurePort());
result.complete();
} else {
result.fail(ar.cause());
}
});
return result.future();
} else {
return Future.succeededFuture();
}
}
@Override
public Future<KafkaTopic> updateResource(KafkaTopic topicResource) {
Promise<KafkaTopic> handler = Promise.promise();
vertx.executeBlocking(future -> {
try {
KafkaTopic kafkaTopic = operation().inNamespace(namespace).withName(topicResource.getMetadata().getName()).patch(topicResource);
LOGGER.debug("KafkaTopic {} updated with version {}->{}",
kafkaTopic != null && kafkaTopic.getMetadata() != null ? kafkaTopic.getMetadata().getName() : null,
topicResource.getMetadata() != null ? topicResource.getMetadata().getResourceVersion() : null,
kafkaTopic != null && kafkaTopic.getMetadata() != null ? kafkaTopic.getMetadata().getResourceVersion() : null);
future.complete(kafkaTopic);
} catch (Exception e) {
future.fail(e);
}
}, handler);
return handler.future();
}
/**
* Creates a sender based on the connection to the AMQP adapter.
*
* @param target The target address to create the sender for or {@code null}
* if an anonymous sender should be created.
* @return A future succeeding with the created sender.
* @throws NullPointerException if qos is {@code null}.
*/
protected Future<ProtonSender> createProducer(final String target) {
final Promise<ProtonSender> result = Promise.promise();
if (context == null) {
result.fail(new IllegalStateException("not connected"));
} else {
context.runOnContext(go -> {
final ProtonSender sender = connection.createSender(target);
// vertx-proton doesn't support MIXED yet
sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
sender.closeHandler(remoteClose -> {
if (remoteClose.failed()) {
log.info("peer closed sender link [exception: {}]", remoteClose.cause().getClass().getName());
result.tryFail(remoteClose.cause());
}
});
sender.openHandler(remoteAttach -> {
if (remoteAttach.failed()) {
log.info("peer rejects opening of sender link", remoteAttach.cause());
result.fail(remoteAttach.cause());
} else if (sender.getRemoteTarget() == null) {
log.info("peer wants to immediately close sender link");
result.fail("could not open sender link");
} else {
result.complete(sender);
}
});
sender.open();
});
}
return result.future();
}
private Future<Void> stopServer() {
final Promise<Void> secureTracker = Promise.promise();
if (server != null) {
log.info("stopping secure AMQP server [{}:{}]", getBindAddress(), getActualPort());
server.close(secureTracker);
} else {
secureTracker.complete();
}
return secureTracker.future();
}
Future<Void> saveToFile() {
final Promise<Void> result = Promise.promise();
if (!getConfig().isSaveToFile()) {
result.complete();;
} else if (dirty.get()) {
checkFileExists(true)
.compose(s -> {
final JsonArray tenantsJson = new JsonArray();
tenants.forEach((tenantId, versionedTenant) -> {
final JsonObject json = JsonObject.mapFrom(versionedTenant.getValue());
json.put(TenantConstants.FIELD_PAYLOAD_TENANT_ID, tenantId);
tenantsJson.add(json);
});
final Promise<Void> writeHandler = Promise.promise();
vertx.fileSystem().writeFile(getConfig().getFilename(),
Buffer.factory.buffer(tenantsJson.encodePrettily()), writeHandler);
return writeHandler.future().map(tenantsJson);
})
.map(tenantsJson -> {
dirty.set(false);
LOG.trace("successfully wrote {} tenants to file {}", tenantsJson.size(),
getConfig().getFilename());
return (Void) null;
})
.onFailure(t -> {
LOG.warn("could not write tenants to file {}", getConfig().getFilename(), t);
})
.onComplete(result);
} else {
LOG.trace("tenants registry does not need to be persisted");
result.complete();;
}
return result.future();
}
private Future<Void> startDeployment() {
Promise<Void> promise = Promise.promise();
if (deploymentManager == null) {
promise.complete();
} else {
logger.info("Starting deployment");
deploymentManager.init(promise::handle);
}
return promise.future();
}
protected Future<Collection<TopicDescription>> describeTopics(Set<String> names) {
Promise<Collection<TopicDescription>> descPromise = Promise.promise();
ac.describeTopics(names).all()
.whenComplete((tds, error) -> {
if (error != null) {
descPromise.fail(error);
} else {
log.debug("Got topic descriptions for {} topics", tds.size());
descPromise.complete(tds.values());
}
});
return descPromise.future();
}
private static Future<ProtonReceiver> openReceiver(final ProtonConnection openConnection, final ProtonMessageHandler messageHandler) {
final Promise<ProtonReceiver> result = Promise.promise();
final ProtonReceiver recv = openConnection.createReceiver(AuthenticationConstants.ENDPOINT_NAME_AUTHENTICATION);
recv.openHandler(result);
recv.handler(messageHandler);
recv.open();
return result.future();
}
@Override
public Future<Void> close(long timeout) {
ContextInternal ctx = (ContextInternal) context.owner().getOrCreateContext();
Promise<Void> trampolineProm = ctx.promise();
this.context.<Void>executeBlocking(prom -> {
if (timeout > 0) {
this.producer.close(timeout, TimeUnit.MILLISECONDS);
} else {
this.producer.close();
}
prom.complete();
}).onComplete(trampolineProm);
return trampolineProm.future(); // Trampoline on caller context
}
@Override
public Future<Void> close(long timeout) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
ctx.executeBlocking(prom -> {
if (timeout > 0) {
adminClient.close(Duration.ofMillis(timeout));
} else {
adminClient.close();
}
prom.complete();
});
return promise.future();
}
@Override
public Future<Void> update(Topic topic) {
Promise<Void> handler = Promise.promise();
byte[] data = TopicSerialization.toJson(topic);
// TODO pass a non-zero version
String topicPath = getTopicPath(topic.getTopicName());
LOGGER.debug("update znode {}", topicPath);
zk.setData(topicPath, data, -1, handler);
return handler.future();
}
@Override
public Future<Void> commit() {
switch (status) {
case ST_BEGIN:
case ST_PENDING:
case ST_PROCESSING:
Promise<Void> promise = context.promise();
schedule__(doQuery(COMMIT, promise));
return promise.future();
case ST_COMPLETED:
return context.failedFuture("Transaction already completed");
default:
throw new IllegalStateException();
}
}
@Override
public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions) {
Promise<Void> promise = Promise.promise();
this.seekToBeginning(topicPartitions, promise);
return promise.future();
}
private Future<Void> bindSecureHttpServer() {
if (config.isSecurePortEnabled()) {
if (Constants.LOOPBACK_DEVICE_ADDRESS.equals(config.getBindAddress())) {
if (bindSecureServerToLoopbackDeviceAllowed) {
LOG.warn("secure health checks HTTP server will bind to loopback device only");
} else {
LOG.info("won't start secure health checks HTTP server: no bind address configured.");
return Future.failedFuture("no bind address configured for secure server");
}
}
final Promise<Void> result = Promise.promise();
final HttpServerOptions options = new HttpServerOptions()
.setPort(config.getPort(DEFAULT_PORT))
.setHost(config.getBindAddress())
.setKeyCertOptions(config.getKeyCertOptions())
.setSsl(true);
server = vertx.createHttpServer(options);
router.get(URI_READINESS_PROBE).handler(readinessHandler);
router.get(URI_LIVENESS_PROBE).handler(livenessHandler);
server.requestHandler(router).listen(startAttempt -> {
if (startAttempt.succeeded()) {
LOG.info("successfully started secure health checks HTTP server");
LOG.info("readiness probe available at https://{}:{}{}", options.getHost(), server.actualPort(),
URI_READINESS_PROBE);
LOG.info("liveness probe available at https://{}:{}{}", options.getHost(), server.actualPort(),
URI_LIVENESS_PROBE);
result.complete();
} else {
LOG.warn("failed to start secure health checks HTTP server: {}", startAttempt.cause().getMessage());
result.fail(startAttempt.cause());
}
});
return result.future();
} else {
LOG.warn("cannot start secure health checks HTTP server: no key material configured");
return Future.failedFuture("no key material configured for secure server");
}
}
@Override
public Future<Void> end() {
Promise<Void> promise = Promise.promise();
registration.unregister(promise);
return promise.future();
}
@Override
public Future<Long> endOffsets(TopicPartition topicPartition) {
Promise<Long> promise = Promise.promise();
endOffsets(topicPartition, promise);
return promise.future();
}
/**
* Checks if this client is currently connected to the server.
*
* @return A succeeded future if this client is connected.
*/
protected final Future<Void> checkConnected() {
final Promise<Void> result = Promise.promise();
checkConnected(result);
return result.future();
}
@Override
public Future<Void> maybeRollingUpdate(StatefulSet sts, Function<Pod, String> podRestart, Secret clusterCaSecret, Secret coKeySecret) {
String namespace = sts.getMetadata().getNamespace();
String name = sts.getMetadata().getName();
final int replicas = sts.getSpec().getReplicas();
log.debug("Considering rolling update of {}/{}", namespace, name);
boolean zkRoll = false;
ArrayList<Pod> pods = new ArrayList<>(replicas);
String cluster = sts.getMetadata().getLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
for (int i = 0; i < replicas; i++) {
Pod pod = podOperations.get(sts.getMetadata().getNamespace(), KafkaResources.zookeeperPodName(cluster, i));
String zkPodRestart = podRestart.apply(pod);
zkRoll |= zkPodRestart != null && !zkPodRestart.isEmpty();
pods.add(pod);
}
final Future<Void> rollFuture;
if (zkRoll) {
// Find the leader
Promise<Void> promise = Promise.promise();
rollFuture = promise.future();
Future<Integer> leaderFuture = leaderFinder.findZookeeperLeader(cluster, namespace, pods, coKeySecret);
leaderFuture.compose(leader -> {
log.debug("Zookeeper leader is " + (leader == ZookeeperLeaderFinder.UNKNOWN_LEADER ? "unknown" : "pod " + leader));
Future<Void> fut = Future.succeededFuture();
// Then roll each non-leader pod
for (int i = 0; i < replicas; i++) {
String podName = KafkaResources.zookeeperPodName(cluster, i);
if (i != leader) {
log.debug("Possibly restarting non-leader pod {}", podName);
// roll the pod and wait until it is ready
// this prevents rolling into faulty state (note: this applies just for ZK pods)
fut = fut.compose(ignore -> maybeRestartPod(sts, podName, podRestart));
} else {
log.debug("Deferring restart of leader {}", podName);
}
}
if (leader == ZookeeperLeaderFinder.UNKNOWN_LEADER) {
return fut;
} else {
// Finally roll the leader pod
return fut.compose(ar -> {
// the leader is rolled as the last
log.debug("Possibly restarting leader pod (previously deferred) {}", leader);
return maybeRestartPod(sts, KafkaResources.zookeeperPodName(cluster, leader), podRestart);
});
}
}).onComplete(rollFuture);
} else {
rollFuture = Future.succeededFuture();
}
return rollFuture;
}
/**
* Stop the shell service, this is an asynchronous stop.
*/
default Future<Void> stop() {
Promise<Void> promise = Promise.promise();
stop(promise);
return promise.future();
}
/**
* @see #createRolePermission(String, String, Handler).
*/
default Future<Void> createRolePermission(String role, String permission) {
Promise<Void> promise = Promise.promise();
createRolePermission(role, permission, promise);
return promise.future();
}