io.vertx.core.Promise#future ( )源码实例Demo

下面列出了io.vertx.core.Promise#future ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hono   文件: VertxBasedAmqpProtocolAdapter.java
private Future<Void> bindInsecureServer() {
    if (isInsecurePortEnabled()) {
        final ProtonServerOptions options =
                new ProtonServerOptions()
                    .setHost(getConfig().getInsecurePortBindAddress())
                    .setPort(determineInsecurePort())
                    .setMaxFrameSize(getConfig().getMaxFrameSize())
                    // set heart beat to half the idle timeout
                    .setHeartbeat(getConfig().getIdleTimeout() >> 1);

        final Promise<Void> result = Promise.promise();
        insecureServer = createServer(insecureServer, options);
        insecureServer.connectHandler(this::onConnectRequest).listen(ar -> {
            if (ar.succeeded()) {
                log.info("insecure AMQP server listening on [{}:{}]", getConfig().getInsecurePortBindAddress(), getActualInsecurePort());
                result.complete();
            } else {
                result.fail(ar.cause());
            }
        });
        return result.future();
    } else {
        return Future.succeededFuture();
    }
}
 
源代码2 项目: strimzi-kafka-operator   文件: K8sImpl.java
@Override
public Future<KafkaTopic> updateResource(KafkaTopic topicResource) {
    Promise<KafkaTopic> handler = Promise.promise();
    vertx.executeBlocking(future -> {
        try {
            KafkaTopic kafkaTopic = operation().inNamespace(namespace).withName(topicResource.getMetadata().getName()).patch(topicResource);
            LOGGER.debug("KafkaTopic {} updated with version {}->{}",
                    kafkaTopic != null && kafkaTopic.getMetadata() != null ? kafkaTopic.getMetadata().getName() : null,
                    topicResource.getMetadata() != null ? topicResource.getMetadata().getResourceVersion() : null,
                    kafkaTopic != null && kafkaTopic.getMetadata() != null ? kafkaTopic.getMetadata().getResourceVersion() : null);
            future.complete(kafkaTopic);
        } catch (Exception e) {
            future.fail(e);
        }
    }, handler);
    return handler.future();
}
 
源代码3 项目: hono   文件: AmqpAdapterTestBase.java
/**
 * Creates a sender based on the connection to the AMQP adapter.
 *
 * @param target The target address to create the sender for or {@code null}
 *               if an anonymous sender should be created.
 * @return A future succeeding with the created sender.
 * @throws NullPointerException if qos is {@code null}.
 */
protected Future<ProtonSender> createProducer(final String target) {

    final Promise<ProtonSender>  result = Promise.promise();
    if (context == null) {
        result.fail(new IllegalStateException("not connected"));
    } else {
        context.runOnContext(go -> {
            final ProtonSender sender = connection.createSender(target);
            // vertx-proton doesn't support MIXED yet
            sender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            sender.closeHandler(remoteClose -> {
                if (remoteClose.failed()) {
                    log.info("peer closed sender link [exception: {}]", remoteClose.cause().getClass().getName());
                    result.tryFail(remoteClose.cause());
                }
            });
            sender.openHandler(remoteAttach -> {
                if (remoteAttach.failed()) {
                    log.info("peer rejects opening of sender link", remoteAttach.cause());
                    result.fail(remoteAttach.cause());
                } else if (sender.getRemoteTarget() == null) {
                    log.info("peer wants to immediately close sender link");
                    result.fail("could not open sender link");
                } else {
                    result.complete(sender);
                }
            });
            sender.open();
        });
    }

    return result.future();
}
 
源代码4 项目: hono   文件: AmqpServiceBase.java
private Future<Void> stopServer() {

        final Promise<Void> secureTracker = Promise.promise();

        if (server != null) {
            log.info("stopping secure AMQP server [{}:{}]", getBindAddress(), getActualPort());
            server.close(secureTracker);
        } else {
            secureTracker.complete();
        }
        return secureTracker.future();
    }
 
源代码5 项目: hono   文件: FileBasedTenantService.java
Future<Void> saveToFile() {

        final Promise<Void> result = Promise.promise();

        if (!getConfig().isSaveToFile()) {
            result.complete();;
        } else if (dirty.get()) {
            checkFileExists(true)
                .compose(s -> {

                    final JsonArray tenantsJson = new JsonArray();
                    tenants.forEach((tenantId, versionedTenant) -> {
                        final JsonObject json = JsonObject.mapFrom(versionedTenant.getValue());
                        json.put(TenantConstants.FIELD_PAYLOAD_TENANT_ID, tenantId);
                        tenantsJson.add(json);
                    });

                    final Promise<Void> writeHandler = Promise.promise();
                    vertx.fileSystem().writeFile(getConfig().getFilename(),
                            Buffer.factory.buffer(tenantsJson.encodePrettily()), writeHandler);
                    return writeHandler.future().map(tenantsJson);
                })
                .map(tenantsJson -> {
                    dirty.set(false);
                    LOG.trace("successfully wrote {} tenants to file {}", tenantsJson.size(),
                            getConfig().getFilename());
                    return (Void) null;
                })
                .onFailure(t -> {
                    LOG.warn("could not write tenants to file {}", getConfig().getFilename(), t);
                })
                .onComplete(result);
        } else {
            LOG.trace("tenants registry does not need to be persisted");
            result.complete();;
        }

        return result.future();
    }
 
源代码6 项目: okapi   文件: MainVerticle.java
private Future<Void> startDeployment() {
  Promise<Void> promise = Promise.promise();
  if (deploymentManager == null) {
    promise.complete();
  } else {
    logger.info("Starting deployment");
    deploymentManager.init(promise::handle);
  }
  return promise.future();
}
 
protected Future<Collection<TopicDescription>> describeTopics(Set<String> names) {
    Promise<Collection<TopicDescription>> descPromise = Promise.promise();
    ac.describeTopics(names).all()
            .whenComplete((tds, error) -> {
                if (error != null) {
                    descPromise.fail(error);
                } else {
                    log.debug("Got topic descriptions for {} topics", tds.size());
                    descPromise.complete(tds.values());
                }
            });
    return descPromise.future();
}
 
源代码8 项目: hono   文件: AuthenticationServerClient.java
private static Future<ProtonReceiver> openReceiver(final ProtonConnection openConnection, final ProtonMessageHandler messageHandler) {

        final Promise<ProtonReceiver> result = Promise.promise();
        final ProtonReceiver recv = openConnection.createReceiver(AuthenticationConstants.ENDPOINT_NAME_AUTHENTICATION);
        recv.openHandler(result);
        recv.handler(messageHandler);
        recv.open();
        return result.future();
    }
 
源代码9 项目: vertx-kafka-client   文件: KafkaWriteStreamImpl.java
@Override
public Future<Void> close(long timeout) {
  ContextInternal ctx = (ContextInternal) context.owner().getOrCreateContext();
  Promise<Void> trampolineProm = ctx.promise();
  this.context.<Void>executeBlocking(prom -> {
    if (timeout > 0) {
      this.producer.close(timeout, TimeUnit.MILLISECONDS);
    } else {
      this.producer.close();
    }
    prom.complete();
  }).onComplete(trampolineProm);
  return trampolineProm.future(); // Trampoline on caller context
}
 
源代码10 项目: vertx-kafka-client   文件: KafkaAdminClientImpl.java
@Override
public Future<Void> close(long timeout) {
  ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
  Promise<Void> promise = ctx.promise();

  ctx.executeBlocking(prom -> {
    if (timeout > 0) {
      adminClient.close(Duration.ofMillis(timeout));
    } else {
      adminClient.close();
    }
    prom.complete();
  });
  return promise.future();
}
 
源代码11 项目: strimzi-kafka-operator   文件: ZkTopicStore.java
@Override
public Future<Void> update(Topic topic) {
    Promise<Void> handler = Promise.promise();
    byte[] data = TopicSerialization.toJson(topic);
    // TODO pass a non-zero version
    String topicPath = getTopicPath(topic.getTopicName());
    LOGGER.debug("update znode {}", topicPath);
    zk.setData(topicPath, data, -1, handler);
    return handler.future();
}
 
源代码12 项目: vertx-sql-client   文件: TransactionImpl.java
@Override
public Future<Void> commit() {
  switch (status) {
    case ST_BEGIN:
    case ST_PENDING:
    case ST_PROCESSING:
      Promise<Void> promise = context.promise();
      schedule__(doQuery(COMMIT, promise));
      return promise.future();
    case ST_COMPLETED:
      return context.failedFuture("Transaction already completed");
    default:
      throw new IllegalStateException();
  }
}
 
源代码13 项目: vertx-kafka-client   文件: KafkaConsumerImpl.java
@Override
public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions) {
  Promise<Void> promise = Promise.promise();
  this.seekToBeginning(topicPartitions, promise);
  return promise.future();
}
 
源代码14 项目: hono   文件: VertxBasedHealthCheckServer.java
private Future<Void> bindSecureHttpServer() {

        if (config.isSecurePortEnabled()) {

            if (Constants.LOOPBACK_DEVICE_ADDRESS.equals(config.getBindAddress())) {
                if (bindSecureServerToLoopbackDeviceAllowed) {
                    LOG.warn("secure health checks HTTP server will bind to loopback device only");
                } else {
                    LOG.info("won't start secure health checks HTTP server: no bind address configured.");
                    return Future.failedFuture("no bind address configured for secure server");
                }
            }

            final Promise<Void> result = Promise.promise();

            final HttpServerOptions options = new HttpServerOptions()
                    .setPort(config.getPort(DEFAULT_PORT))
                    .setHost(config.getBindAddress())
                    .setKeyCertOptions(config.getKeyCertOptions())
                    .setSsl(true);
            server = vertx.createHttpServer(options);

            router.get(URI_READINESS_PROBE).handler(readinessHandler);
            router.get(URI_LIVENESS_PROBE).handler(livenessHandler);

            server.requestHandler(router).listen(startAttempt -> {
                if (startAttempt.succeeded()) {
                    LOG.info("successfully started secure health checks HTTP server");
                    LOG.info("readiness probe available at https://{}:{}{}", options.getHost(), server.actualPort(),
                            URI_READINESS_PROBE);
                    LOG.info("liveness probe available at https://{}:{}{}", options.getHost(), server.actualPort(),
                            URI_LIVENESS_PROBE);
                    result.complete();
                } else {
                    LOG.warn("failed to start secure health checks HTTP server: {}", startAttempt.cause().getMessage());
                    result.fail(startAttempt.cause());
                }
            });
            return result.future();
        } else {
            LOG.warn("cannot start secure health checks HTTP server: no key material configured");
            return Future.failedFuture("no key material configured for secure server");
        }
    }
 
源代码15 项目: vertx-web   文件: SockJSSocketBase.java
@Override
public Future<Void> end() {
  Promise<Void> promise = Promise.promise();
  registration.unregister(promise);
  return promise.future();
}
 
源代码16 项目: vertx-kafka-client   文件: KafkaConsumerImpl.java
@Override
public Future<Long> endOffsets(TopicPartition topicPartition) {
  Promise<Long> promise = Promise.promise();
  endOffsets(topicPartition, promise);
  return promise.future();
}
 
源代码17 项目: hono   文件: HonoConnectionImpl.java
/**
 * Checks if this client is currently connected to the server.
 *
 * @return A succeeded future if this client is connected.
 */
protected final Future<Void> checkConnected() {
    final Promise<Void> result = Promise.promise();
    checkConnected(result);
    return result.future();
}
 
@Override
public Future<Void> maybeRollingUpdate(StatefulSet sts, Function<Pod, String> podRestart, Secret clusterCaSecret, Secret coKeySecret) {
    String namespace = sts.getMetadata().getNamespace();
    String name = sts.getMetadata().getName();
    final int replicas = sts.getSpec().getReplicas();
    log.debug("Considering rolling update of {}/{}", namespace, name);

    boolean zkRoll = false;
    ArrayList<Pod> pods = new ArrayList<>(replicas);
    String cluster = sts.getMetadata().getLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
    for (int i = 0; i < replicas; i++) {
        Pod pod = podOperations.get(sts.getMetadata().getNamespace(), KafkaResources.zookeeperPodName(cluster, i));
        String zkPodRestart = podRestart.apply(pod);
        zkRoll |= zkPodRestart != null && !zkPodRestart.isEmpty();
        pods.add(pod);
    }

    final Future<Void> rollFuture;
    if (zkRoll) {
        // Find the leader
        Promise<Void> promise = Promise.promise();
        rollFuture = promise.future();
        Future<Integer> leaderFuture = leaderFinder.findZookeeperLeader(cluster, namespace, pods, coKeySecret);
        leaderFuture.compose(leader -> {
            log.debug("Zookeeper leader is " + (leader == ZookeeperLeaderFinder.UNKNOWN_LEADER ? "unknown" : "pod " + leader));
            Future<Void> fut = Future.succeededFuture();
            // Then roll each non-leader pod
            for (int i = 0; i < replicas; i++) {
                String podName = KafkaResources.zookeeperPodName(cluster, i);
                if (i != leader) {
                    log.debug("Possibly restarting non-leader pod {}", podName);
                    // roll the pod and wait until it is ready
                    // this prevents rolling into faulty state (note: this applies just for ZK pods)
                    fut = fut.compose(ignore -> maybeRestartPod(sts, podName, podRestart));
                } else {
                    log.debug("Deferring restart of leader {}", podName);
                }
            }
            if (leader == ZookeeperLeaderFinder.UNKNOWN_LEADER) {
                return fut;
            } else {
                // Finally roll the leader pod
                return fut.compose(ar -> {
                    // the leader is rolled as the last
                    log.debug("Possibly restarting leader pod (previously deferred) {}", leader);
                    return maybeRestartPod(sts, KafkaResources.zookeeperPodName(cluster, leader), podRestart);
                });
            }
        }).onComplete(rollFuture);
    } else {
        rollFuture = Future.succeededFuture();
    }
    return rollFuture;
}
 
源代码19 项目: vertx-shell   文件: ShellService.java
/**
 * Stop the shell service, this is an asynchronous stop.
 */
default Future<Void> stop() {
  Promise<Void> promise = Promise.promise();
  stop(promise);
  return promise.future();
}
 
源代码20 项目: vertx-auth   文件: JDBCUserUtil.java
/**
 * @see #createRolePermission(String, String, Handler).
 */
default Future<Void> createRolePermission(String role, String permission) {
  Promise<Void> promise = Promise.promise();
  createRolePermission(role, permission, promise);
  return promise.future();
}