下面列出了io.vertx.core.Promise#complete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Future<Void> checkFileExists(final boolean createIfMissing) {
final Promise<Void> result = Promise.promise();
if (getConfig().getFilename() == null) {
result.fail("no filename set");
} else if (vertx.fileSystem().existsBlocking(getConfig().getFilename())) {
result.complete();
} else if (createIfMissing) {
vertx.fileSystem().createFile(getConfig().getFilename(), result);
} else {
LOG.debug("no such file [{}]", getConfig().getFilename());
result.complete();
}
return result.future();
}
protected void writeInWorker(Promise<Void> future) {
while (true) {
Buffer buffer = buffers.poll();
if (buffer == null) {
future.complete();
return;
}
try {
outputStream.write(buffer.getBytes());
synchronized (OutputStreamToWriteStream.this) {
currentBufferCount--;
Runnable action = (currentBufferCount == 0 && closedDeferred != null) ? closedDeferred : this::checkDrained;
action.run();
}
} catch (IOException e) {
currentBufferCount--;
future.fail(e);
return;
}
}
}
/**
* {@inheritDoc}
* <p>
* This implementation checks if the service instance implements {@link Lifecycle}
* and if so, invokes its {@linkplain Lifecycle#start() start method}.
*/
@Override
protected void doStart(final Promise<Void> startPromise) {
if (service instanceof Lifecycle) {
((Lifecycle) service).start().onComplete(startPromise);
} else {
startPromise.complete();
}
}
@Override
protected final void doStart(final Promise<Void> startFuture) {
if (factory == null) {
startFuture.fail(new IllegalStateException("no connection factory for Authentication service set"));
} else {
client = new AuthenticationServerClient(vertx, factory);
if (log.isInfoEnabled()) {
final String saslMechanisms = getConfig().getSupportedSaslMechanisms().stream()
.collect(Collectors.joining(", "));
log.info("starting {} with support for SASL mechanisms: {}", toString(), saslMechanisms);
}
startFuture.complete();
}
}
/**
* {@inheritDoc}
* <p>
* This implementation checks if the service instance implements {@link Lifecycle}
* and if so, invokes its {@linkplain Lifecycle#stop() stop method}.
*/
@Override
protected void doStop(final Promise<Void> stopPromise) {
if (service instanceof Lifecycle) {
((Lifecycle) service).stop().onComplete(stopPromise);
} else {
stopPromise.complete();
}
}
@Override
public Future<Void> stop() {
final Promise<Void> stopPromise = Promise.promise();
if (running.compareAndSet(true, false)) {
saveToFile().onComplete(stopPromise);
} else {
stopPromise.complete();
}
return stopPromise.future();
}
private static <T> void handleErrorExecution(
Promise<ExecutionResult<T>> _blockingHandler,
Consumer<Throwable> _errorHandler,
ThrowableFunction<Throwable, T> _onFailureRespond,
Consumer<Throwable> _errorMethodHandler,
Throwable cause) {
final T result =
handleError(_errorHandler, _onFailureRespond, _errorMethodHandler, _blockingHandler, cause);
if (!_blockingHandler.future().isComplete()) {
_blockingHandler.complete(new ExecutionResult<>(result, true, true, null));
}
}
@Override
public Future<MappedMessage> mapMessage(
final MqttContext ctx,
final ResourceIdentifier targetAddress,
final JsonObject registrationInfo) {
Objects.requireNonNull(ctx);
Objects.requireNonNull(registrationInfo);
final Promise<MappedMessage> result = Promise.promise();
final Object mapperObject = registrationInfo.getValue(RegistrationConstants.FIELD_MAPPER);
if (mapperObject instanceof String) {
final String mapper = (String) mapperObject;
if (mapper.isBlank()) {
LOG.debug("no payload mapping configured for {}", ctx.authenticatedDevice());
result.complete(new MappedMessage(targetAddress, ctx.message().payload()));
} else {
final MapperEndpoint mapperEndpoint = mqttProtocolAdapterProperties.getMapperEndpoint(mapper);
if (mapperEndpoint == null) {
LOG.debug("no mapping endpoint [name: {}] found for {}", mapper, ctx.authenticatedDevice());
result.complete(new MappedMessage(targetAddress, ctx.message().payload()));
} else {
mapMessageRequest(ctx, targetAddress, registrationInfo, mapperEndpoint, result);
}
}
} else {
LOG.debug("no payload mapping configured for {}", ctx.authenticatedDevice());
result.complete(new MappedMessage(targetAddress, ctx.message().payload()));
}
return result.future();
}
private void handleRetryResult(long retryInterval, long next, AsyncResult<Void> retryResult,
Promise<Void> promise) {
if (retryResult.succeeded()) {
promise.complete();
} else {
retryDownload(promise, retryInterval, next);
}
}
@Override
public Future<Void> start() {
final Promise<Void> result = Promise.promise();
if (running.compareAndSet(false, true)) {
if (!getConfig().isModificationEnabled()) {
LOG.info("modification of registered devices has been disabled");
}
if (getConfig().getFilename() == null) {
LOG.debug("device identity filename is not set, no identity information will be loaded");
result.complete();
} else {
checkFileExists(getConfig().isSaveToFile())
.compose(ok -> loadRegistrationData())
.onSuccess(ok -> {
if (getConfig().isSaveToFile()) {
LOG.info("saving device identities to file every 3 seconds");
vertx.setPeriodic(3000, tid -> saveToFile());
} else {
LOG.info("persistence is disabled, will not save device identities to file");
}
})
.onFailure(t -> {
LOG.error("failed to start up service", t);
running.set(false);
})
.onComplete(result);
}
} else {
result.complete();
}
return result.future();
}
protected <V,U> Handler<AsyncResult<V>> executeAndClose(Function<V, U> func, SQLConnection sqlConnection, Promise<U> resultFuture) {
return rs -> {
try{
if (rs.succeeded()) {
resultFuture.complete(func.apply(rs.result()));
} else {
resultFuture.fail(rs.cause());
}
}catch(Throwable e) {
resultFuture.fail(e);
}finally {
sqlConnection.close();
}
};
}
/**
* Gets the value of a request parameter.
*
* @param ctx The routing context to get the parameter from.
* @param paramName The name of the parameter.
* @param validator A predicate to use for validating the parameter value.
* The predicate may throw an {@code IllegalArgumentException}
* instead of returning {@code false} in order to convey additional
* information about why the test failed.
* @return A future indicating the outcome of the operation.
* If the request does not contain a parameter with the given name, the future will be
* <ul>
* <li>completed with an empty optional if the <em>optional</em> flag is {@code true}, or</li>
* <li>failed with a {@link ClientErrorException} with status 400 if the flag is {@code false}.</li>
* </ul>
* If the request does contain a parameter with the given name, the future will be
* <ul>
* <li>failed with a {@link ClientErrorException} with status 400 if a predicate has been
* given and the predicate evaluates to {@code false}, or</li>
* <li>otherwise be completed with the parameter value.</li>
* </ul>
* @throws NullPointerException If ctx, paramName or validator are {@code null}.
*/
protected final Future<String> getRequestParameter(
final RoutingContext ctx,
final String paramName,
final Predicate<String> validator) {
Objects.requireNonNull(ctx);
Objects.requireNonNull(paramName);
Objects.requireNonNull(validator);
final Promise<String> result = Promise.promise();
final String value = ctx.request().getParam(paramName);
try {
if (validator.test(value)) {
result.complete(value);
} else {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
String.format("request parameter [name: %s, value: %s] failed validation", paramName, value)));
}
} catch (final IllegalArgumentException e) {
result.fail(new ClientErrorException(
HttpURLConnection.HTTP_BAD_REQUEST,
String.format("request parameter [name: %s, value: %s] failed validation: %s", paramName, value, e.getMessage()),
e));
}
return result.future();
}
@Test
public void testTwoHandlersAfter() {
final AtomicInteger counter = new AtomicInteger();
final Promise<String> p = Promise.promise();
MoreFutures.whenComplete(p.future(), () -> counter.incrementAndGet())
.onComplete(ar -> counter.incrementAndGet());
p.complete("Foo");
assertEquals(2, counter.get());
}
@Override
public void stop(Promise<Void> stopPromise) throws Exception {
webServer.stop();
stopPromise.complete();
}
private void processRemoteOpen(final ProtonConnection con) {
final Span span = Optional
// try to pick up span that has been created during SASL handshake
.ofNullable(con.attachments().get(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class))
// or create a fresh one if no SASL handshake has been performed
.orElse(tracer.buildSpan("open connection")
.ignoreActiveSpan()
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.withTag(Tags.COMPONENT.getKey(), getTypeName())
.start());
final Device authenticatedDevice = getAuthenticatedDevice(con);
TracingHelper.TAG_AUTHENTICATED.set(span, authenticatedDevice != null);
if (authenticatedDevice != null) {
TracingHelper.setDeviceTags(span, authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
}
final Promise<Void> connectAuthorizationCheck = Promise.promise();
if (getConfig().isAuthenticationRequired()) {
if (authenticatedDevice == null) {
connectAuthorizationCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_UNAUTHORIZED,
"anonymous devices not supported"));
} else {
log.trace("received connection request from {}", authenticatedDevice);
// the SASL handshake will already have authenticated the device
// we still need to verify that
// the adapter is enabled for the tenant,
// the device/gateway exists and is enabled and
// that the connection limit for the tenant and the adapter are not exceeded.
CompositeFuture.all(checkDeviceRegistration(authenticatedDevice, span.context()),
getTenantConfiguration(authenticatedDevice.getTenantId(), span.context())
.compose(tenantConfig -> CompositeFuture.all(
isAdapterEnabled(tenantConfig),
checkConnectionLimitForAdapter()
.onFailure(ex -> {
metrics.reportConnectionAttempt(
ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
}),
checkConnectionLimit(tenantConfig, span.context()))))
.map(ok -> {
log.debug("{} is registered and enabled", authenticatedDevice);
span.log(String.format("device [%s] is registered and enabled", authenticatedDevice));
return (Void) null;
}).onComplete(connectAuthorizationCheck);
}
} else {
log.trace("received connection request from anonymous device [container: {}]", con.getRemoteContainer());
connectAuthorizationCheck.complete();
}
connectAuthorizationCheck.future()
.compose(ok -> sendConnectedEvent(
Optional.ofNullable(con.getRemoteContainer()).orElse("unknown"),
authenticatedDevice))
.map(ok -> {
con.setContainer(getTypeName());
con.setOfferedCapabilities(new Symbol[] {Constants.CAP_ANONYMOUS_RELAY});
con.open();
log.debug("connection with device [container: {}] established", con.getRemoteContainer());
span.log("connection established");
return null;
}).otherwise(t -> {
con.setCondition(getErrorCondition(t));
con.close();
TracingHelper.logError(span, t);
return null;
}).onComplete(s -> span.finish());
}
/**
* Succeeds given {@link Promise} and returns corresponding {@link Future}.
*/
private static <T> Future<T> succeedBreaker(T result, Promise<T> promise) {
promise.complete(result);
return promise.future();
}
private static <T> void executeStateless(
ThrowableSupplier<T> _supplier,
Promise<ExecutionResult<T>> _blockingHandler,
Consumer<Throwable> errorHandler,
ThrowableFunction<Throwable, T> onFailureRespond,
Consumer<Throwable> errorMethodHandler,
VxmsShared vxmsShared,
int _retry,
long timeout,
long delay) {
T result = null;
boolean errorHandling = false;
while (_retry >= DEFAULT_VALUE) {
errorHandling = false;
try {
if (timeout > DEFAULT_LONG_VALUE) {
result = executeWithTimeout(_supplier, vxmsShared, timeout);
_retry = STOP_CONDITION;
} else {
result = _supplier.get();
_retry = STOP_CONDITION;
}
} catch (Throwable e) {
_retry--;
if (_retry < DEFAULT_VALUE) {
try {
result = handleError(errorHandler, onFailureRespond, errorMethodHandler,
_blockingHandler, e);
errorHandling = true;
} catch (Exception ee) {
_blockingHandler.fail(ee);
}
} else {
org.jacpfx.vxms.event.response.basic.ResponseExecution.handleError(errorHandler, e);
handleDelay(delay);
}
}
}
if (!_blockingHandler.future().isComplete() && result != null) {
_blockingHandler.complete(new ExecutionResult<>(result, true, errorHandling, null));
} else if (!_blockingHandler.future().isComplete()) {
_blockingHandler.complete(new ExecutionResult<>(result, false, errorHandling, null));
}
}
/**
* This method handles the transition from {@code PendingProposal} state.
* It starts a periodic timer in order to check the status of the ongoing rebalance proposal processing on Cruise Control side.
* In order to do that, it calls the Cruise Control API for requesting the rebalance proposal.
* When the proposal is ready, the next state is {@code ProposalReady}.
* If the user sets the strimzi.io/rebalance=stop annotation, it stops to polling the Cruise Control API for requesting the rebalance proposal.
* If the user sets any other values for the strimzi.io/rebalance annotation, it is just ignored and the rebalance proposal request just continues.
* This method holds the lock until the rebalance proposal is ready or any exception is raised.
*
* @param reconciliation Reconciliation information
* @param host Cruise Control service to which sending the REST API requests
* @param apiClient Cruise Control REST API client instance
* @param kafkaRebalance Current {@code KafkaRebalance} resource
* @param rebalanceAnnotation The current value for the strimzi.io/rebalance annotation
* @param rebalanceOptionsBuilder builder for the Cruise Control REST API client options
* @return a Future with the next {@code KafkaRebalanceStatus} bringing the state
*/
private Future<KafkaRebalanceStatus> onPendingProposal(Reconciliation reconciliation,
String host, CruiseControlApi apiClient,
KafkaRebalance kafkaRebalance,
RebalanceAnnotation rebalanceAnnotation,
RebalanceOptions.RebalanceOptionsBuilder rebalanceOptionsBuilder) {
Promise<KafkaRebalanceStatus> p = Promise.promise();
if (rebalanceAnnotation == RebalanceAnnotation.none) {
log.debug("{}: Arming Cruise Control rebalance proposal request timer", reconciliation);
vertx.setPeriodic(REBALANCE_POLLING_TIMER_MS, t -> {
kafkaRebalanceOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()).onComplete(getResult -> {
if (getResult.succeeded()) {
KafkaRebalance freshKafkaRebalance = getResult.result();
// checking that the resource wasn't delete meanwhile the timer wasn't raised
if (freshKafkaRebalance != null) {
// checking it is in the right state because the timer could be called again (from a delayed timer firing)
// and the previous execution set the status and completed the future
if (state(freshKafkaRebalance) == State.PendingProposal) {
if (rebalanceAnnotation(freshKafkaRebalance) == RebalanceAnnotation.stop) {
log.debug("{}: Stopping current Cruise Control proposal request timer", reconciliation);
vertx.cancelTimer(t);
p.complete(buildRebalanceStatus(null, State.Stopped));
} else {
requestRebalance(reconciliation, host, apiClient, true, rebalanceOptionsBuilder,
freshKafkaRebalance.getStatus().getSessionId()).onComplete(rebalanceResult -> {
if (rebalanceResult.succeeded()) {
// If the returned status has an optimization result then the rebalance proposal
// is ready, so stop the polling
if (rebalanceResult.result().getOptimizationResult() != null &&
!rebalanceResult.result().getOptimizationResult().isEmpty()) {
vertx.cancelTimer(t);
log.debug("{}: Optimization proposal ready", reconciliation);
p.complete(rebalanceResult.result());
} else {
log.debug("{}: Waiting for optimization proposal to be ready", reconciliation);
}
// The rebalance proposal is still not ready yet, keep the timer for polling
} else {
log.error("{}: Cruise Control getting rebalance proposal failed", reconciliation, rebalanceResult.cause());
vertx.cancelTimer(t);
p.fail(rebalanceResult.cause());
}
});
}
} else {
p.complete(freshKafkaRebalance.getStatus());
}
} else {
log.debug("{}: Rebalance resource was deleted, stopping the request time", reconciliation);
vertx.cancelTimer(t);
p.complete();
}
} else {
log.error("{}: Cruise Control getting rebalance resource failed", reconciliation, getResult.cause());
vertx.cancelTimer(t);
p.fail(getResult.cause());
}
});
});
} else {
p.complete(kafkaRebalance.getStatus());
}
return p.future();
}
private Future<HonoUser> getToken(final ProtonConnection openCon) {
final Promise<HonoUser> result = Promise.promise();
final ProtonMessageHandler messageHandler = (delivery, message) -> {
final String type = MessageHelper.getApplicationProperty(
message.getApplicationProperties(),
AuthenticationConstants.APPLICATION_PROPERTY_TYPE,
String.class);
if (AuthenticationConstants.TYPE_AMQP_JWT.equals(type)) {
final String payload = MessageHelper.getPayloadAsString(message);
if (payload != null) {
final HonoUser user = new HonoUserAdapter() {
@Override
public String getToken() {
return payload;
}
};
LOG.debug("successfully retrieved token from Authentication service");
result.complete(user);
} else {
result.fail(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR,
"message from Authentication service contains no body"));
}
} else {
result.fail(new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR,
"Authentication service issued unsupported token [type: " + type + "]"));
}
};
openReceiver(openCon, messageHandler)
.onComplete(attempt -> {
if (attempt.succeeded()) {
vertx.setTimer(5000, tid -> {
result.tryFail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE,
"time out reached while waiting for token from Authentication service"));
});
LOG.debug("opened receiver link to Authentication service, waiting for token ...");
} else {
result.fail(attempt.cause());
}
});
return result.future();
}
/**
* Subclasses should override this method to perform any work required on start-up of this service.
* <p>
* This default implementation always completes the promise.
* <p>
* This method is invoked by {@link #start()} as part of the {@code Verticle} deployment process.
*
* @param startPromise promise to complete once start up has succeeded.
*/
protected void doStart(final Promise<Void> startPromise) {
// should be overridden by subclasses
startPromise.complete();
}