下面列出了io.vertx.core.Promise#promise ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Will check all Kafka pods whether the user requested the pod and PVC deletion through an annotation
*
* @return
*/
Future<ReconciliationState> kafkaManualPodCleaning() {
String stsName = KafkaCluster.kafkaClusterName(name);
Future<StatefulSet> futureSts = kafkaSetOperations.getAsync(namespace, stsName);
Promise<Void> resultPromise = Promise.promise();
futureSts.onComplete(res -> {
if (res.succeeded()) {
StatefulSet sts = res.result();
// The storage can change when the JBOD volumes are added / removed etc.
// At this point, the STS has not been updated yet. So we use the old storage configuration to get the old PVCs.
// This is needed because the restarted pod will be created from old statefulset with old storage configuration.
List<PersistentVolumeClaim> desiredPvcs = kafkaCluster.generatePersistentVolumeClaims(getOldStorage(sts));
Future<List<PersistentVolumeClaim>> existingPvcsFuture = pvcOperations.listAsync(namespace, kafkaCluster.getSelectorLabels());
maybeCleanPodAndPvc(kafkaSetOperations, sts, desiredPvcs, existingPvcsFuture).onComplete(resultPromise);
} else {
resultPromise.fail(res.cause());
}
});
return withVoid(resultPromise.future());
}
/**
* Deletes a resource using an HTTP DELETE request.
*
* @param requestOptions The options to use for the request.
* @param successPredicate A predicate on the returned HTTP status code for determining success.
* @return A future that will succeed if the predicate evaluates to {@code true}.
* @throws NullPointerException if options or predicate are {@code null}.
*/
public Future<Void> delete(final RequestOptions requestOptions, final IntPredicate successPredicate) {
Objects.requireNonNull(requestOptions);
Objects.requireNonNull(successPredicate);
final Promise<Void> result = Promise.promise();
context.runOnContext(go -> {
@SuppressWarnings("deprecation")
final HttpClientRequest req = client.delete(requestOptions)
.handler(response -> {
LOGGER.debug("got response [status: {}]", response.statusCode());
if (successPredicate.test(response.statusCode())) {
checkCorsHeaders(response, result);
result.tryComplete();
} else {
result.tryFail(newUnexpectedResponseStatusException(response.statusCode()));
}
}).exceptionHandler(result::tryFail);
req.headers().add(HttpHeaders.ORIGIN, ORIGIN_URI);
req.end();
});
return result.future();
}
private Future<Command> getCommandFromUser() {
final Promise<Command> result = Promise.promise();
workerExecutor.executeBlocking(userInputFuture -> {
System.out.println();
System.out.println();
System.out.printf(
">>>>>>>>> Enter name of command for device [%s] in tenant [%s] (prefix with 'ow:' to send one-way command):",
deviceId, tenantId);
System.out.println();
final String honoCmd = scanner.nextLine();
System.out.println(">>>>>>>>> Enter command payload:");
final String honoPayload = scanner.nextLine();
System.out.println(">>>>>>>>> Enter content type:");
final String honoContentType = scanner.nextLine();
System.out.println();
userInputFuture.complete(new Command(honoCmd, honoPayload, honoContentType));
}, result);
return result.future();
}
/**
* Gets the device from the request body.
*
* @param ctx The context to retrieve the request body from.
* @return A future indicating the outcome of the operation.
* The future will be succeeded if the request body is either empty or contains a JSON
* object that complies with the Device Registry Management API's Device object definition.
* Otherwise, the future will be failed with a {@link org.eclipse.hono.client.ClientErrorException}
* containing a corresponding status code.
* @throws NullPointerException If the context is {@code null}.
*/
private static Future<Device> fromPayload(final RoutingContext ctx) {
Objects.requireNonNull(ctx);
final Promise<Device> result = Promise.promise();
Optional.ofNullable(ctx.get(KEY_REQUEST_BODY))
.map(JsonObject.class::cast)
.ifPresentOrElse(
// validate payload
json -> {
try {
result.complete(json.mapTo(Device.class));
} catch (final DecodeException | IllegalArgumentException e) {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"request does not contain a valid Device object", e));
}
},
// payload was empty
() -> result.complete(new Device()));
return result.future();
}
Future<Void> reconcile(String username, String password) {
Promise<Void> promise = Promise.promise();
vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
future -> {
boolean exists = credsManager.exists(username);
if (password != null) {
credsManager.createOrUpdate(username, password);
future.complete(null);
} else {
if (exists) {
credsManager.delete(username);
future.complete(null);
} else {
future.complete(null);
}
}
},
false,
promise);
return promise.future();
}
/**
* {@inheritDoc}
* <p>
* This method uses the {@linkplain #createMessageId() message ID} to correlate the response received
* from a device with the request.
*/
@Override
public Future<BufferResult> sendCommand(final String deviceId, final String command, final String contentType,
final Buffer data, final Map<String, Object> properties) {
Objects.requireNonNull(deviceId);
Objects.requireNonNull(command);
final Span currentSpan = newChildSpan(null, command);
TracingHelper.setDeviceTags(currentSpan, getTenantId(), deviceId);
final Promise<BufferResult> resultTracker = Promise.promise();
final String messageTargetAddress = getTargetAddress(getTenantId(), deviceId);
createAndSendRequest(command, messageTargetAddress, properties, data, contentType, resultTracker,
null, currentSpan);
return mapResultAndFinishSpan(
resultTracker.future(),
result -> {
if (result.isOk()) {
return result;
} else {
throw StatusCodeMapper.from(result);
}
},
currentSpan);
}
/**
* Rolls a transaction back.
* @return a <code>Future</code> that completes when the transaction has been rolled back.
* @throws IllegalStateException if not called <code>beginTransaction</code> before.
*/
public Future<Void> rollback(){
if(!(delegate instanceof Transaction)){
throw new IllegalStateException("Not in transaction");
}
Promise<Void> commit = Promise.promise();
((Transaction) delegate).rollback(commit);
return commit.future();
}
private Future<Void> stopInsecureServer() {
final Promise<Void> result = Promise.promise();
if (insecureServer != null) {
log.info("Shutting down insecure server");
insecureServer.close(result);
} else {
result.complete();
}
return result.future();
}
/**
* Asynchronously reconciles the resource with the given name to match the given
* desired resource, returning a future for the result.
* @param name The name of the resource to reconcile.
* @param desired The desired state of the resource.
* @return A future which completes when the resource was reconciled.
*/
public Future<ReconcileResult<T>> reconcile(String name, T desired) {
if (desired != null && !name.equals(desired.getMetadata().getName())) {
return Future.failedFuture("Given name " + name + " incompatible with desired name "
+ desired.getMetadata().getName());
}
Promise<ReconcileResult<T>> promise = Promise.promise();
vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
future -> {
T current = operation().withName(name).get();
if (desired != null) {
if (current == null) {
log.debug("{} {} does not exist, creating it", resourceKind, name);
internalCreate(name, desired).onComplete(future);
} else {
log.debug("{} {} already exists, patching it", resourceKind, name);
internalPatch(name, current, desired).onComplete(future);
}
} else {
if (current != null) {
// Deletion is desired
log.debug("{} {} exist, deleting it", resourceKind, name);
internalDelete(name).onComplete(future);
} else {
log.debug("{} {} does not exist, noop", resourceKind, name);
future.complete(ReconcileResult.noop(null));
}
}
},
false,
promise
);
return promise.future();
}
/**
* Verify that created secrets contains an ID.
*
* @param ctx The vert.x test context.
*/
@Test
public void testReturnedSecretContainAnId(final VertxTestContext ctx) {
final String tenantId = UUID.randomUUID().toString();
final String deviceId = UUID.randomUUID().toString();
final String authId = UUID.randomUUID().toString();
final CommonCredential credential = createPasswordCredential(authId, "bar");
final List<CommonCredential> credentials = Arrays.asList(credential);
// create device & set credentials
final Promise<?> phase1 = Promise.promise();
getDeviceManagementService()
.createDevice(tenantId, Optional.of(deviceId), new Device(), NoopSpan.INSTANCE)
.onComplete(ctx.succeeding(n -> getCredentialsManagementService()
.updateCredentials(tenantId, deviceId, credentials, Optional.empty(), NoopSpan.INSTANCE)
.onComplete(ctx.succeeding(s -> phase1.complete()))));
// validate credentials - contains an ID.
final Promise<?> phase2 = Promise.promise();
phase1.future().onComplete(ctx.succeeding(n -> {
getCredentialsService().get(tenantId, CredentialsConstants.SECRETS_TYPE_HASHED_PASSWORD, authId)
.onComplete(ctx.succeeding(s -> ctx.verify(() -> {
assertEquals(HttpURLConnection.HTTP_OK, s.getStatus());
final CredentialsObject creds = s.getPayload().mapTo(CredentialsObject.class);
assertEquals(authId, creds.getAuthId());
assertEquals(CredentialsConstants.SECRETS_TYPE_HASHED_PASSWORD, creds.getType());
assertEquals(1, creds.getSecrets().size());
assertNotNull(
creds.getSecrets().getJsonObject(0).getString(RegistryManagementConstants.FIELD_ID));
phase2.complete();
})));
}));
// finally complete
phase2.future().onComplete(ctx.succeeding(s -> ctx.completeNow()));
}
private Future<Void> deleteFromTopicStore(LogContext logContext, HasMetadata involvedObject, TopicName topicName) {
Promise<Void> reconciliationResultHandler = Promise.promise();
enqueue(new DeleteFromTopicStore(logContext, topicName, involvedObject, reconciliationResultHandler));
return reconciliationResultHandler.future();
}
/**
* Updates the Status field of the KafkaConnect or KafkaConnector CR. It diffs the desired status against the current status and calls
* the update only when there is any difference in non-timestamp fields.
*
* @param resource The CR of KafkaConnect or KafkaConnector
* @param reconciliation Reconciliation information
* @param desiredStatus The KafkaConnectStatus or KafkaConnectorStatus which should be set
*
* @return
*/
protected <T extends CustomResource & HasStatus<S>, S extends Status, L extends CustomResourceList<T>, D extends Doneable<T>> Future<Void>
maybeUpdateStatusCommon(CrdOperator<KubernetesClient, T, L, D> resourceOperator,
T resource,
Reconciliation reconciliation,
S desiredStatus,
BiFunction<T, S, T> copyWithStatus) {
Promise<Void> updateStatusPromise = Promise.promise();
resourceOperator.getAsync(resource.getMetadata().getNamespace(), resource.getMetadata().getName()).onComplete(getRes -> {
if (getRes.succeeded()) {
T fetchedResource = getRes.result();
if (fetchedResource != null) {
if ((!(fetchedResource instanceof KafkaConnector))
&& (!(fetchedResource instanceof KafkaMirrorMaker2))
&& StatusUtils.isResourceV1alpha1(fetchedResource)) {
log.warn("{}: {} {} needs to be upgraded from version {} to 'v1beta1' to use the status field",
reconciliation, fetchedResource.getKind(), fetchedResource.getMetadata().getName(), fetchedResource.getApiVersion());
updateStatusPromise.complete();
} else {
S currentStatus = fetchedResource.getStatus();
StatusDiff ksDiff = new StatusDiff(currentStatus, desiredStatus);
if (!ksDiff.isEmpty()) {
T resourceWithNewStatus = copyWithStatus.apply(fetchedResource, desiredStatus);
resourceOperator.updateStatusAsync(resourceWithNewStatus).onComplete(updateRes -> {
if (updateRes.succeeded()) {
log.debug("{}: Completed status update", reconciliation);
updateStatusPromise.complete();
} else {
log.error("{}: Failed to update status", reconciliation, updateRes.cause());
updateStatusPromise.fail(updateRes.cause());
}
});
} else {
log.debug("{}: Status did not change", reconciliation);
updateStatusPromise.complete();
}
}
} else {
log.error("{}: Current {} resource not found", reconciliation, resource.getKind());
updateStatusPromise.fail("Current " + resource.getKind() + " resource not found");
}
} else {
log.error("{}: Failed to get the current {} resource and its status", reconciliation, resource.getKind(), getRes.cause());
updateStatusPromise.fail(getRes.cause());
}
});
return updateStatusPromise.future();
}
/**
* Verifies that a request to upload an "unsettled" telemetry message results in the sender sending the
* message and waits for a response from the downstream peer.
*/
@Test
public void testUploadTelemetryWithAtLeastOnceDeliverySemantics() {
// GIVEN an adapter configured to use a user-define server.
final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter();
final DownstreamSender telemetrySender = givenATelemetrySenderForAnyTenant();
final Promise<ProtonDelivery> downstreamDelivery = Promise.promise();
when(telemetrySender.sendAndWaitForOutcome(any(Message.class), (SpanContext) any()))
.thenReturn(downstreamDelivery.future());
// which is enabled for a tenant
final TenantObject tenantObject = givenAConfiguredTenant(TEST_TENANT_ID, true);
// IF a device send telemetry data (with un-settled delivery)
final ProtonDelivery delivery = mock(ProtonDelivery.class);
when(delivery.remotelySettled()).thenReturn(false);
final Buffer payload = Buffer.buffer("payload");
final String to = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT_SHORT, TEST_TENANT_ID, TEST_DEVICE).toString();
final Message mockMessage = getFakeMessage(to, payload);
adapter.onMessageReceived(AmqpContext.fromMessage(delivery, mockMessage, null));
// THEN the sender sends the message
verify(telemetrySender).sendAndWaitForOutcome(any(Message.class), (SpanContext) any());
// using the canonical endpoint name
verify(mockMessage).setAddress(TelemetryConstants.TELEMETRY_ENDPOINT + "/" + TEST_TENANT_ID);
// and waits for the outcome from the downstream peer
verify(delivery, never()).disposition(any(DeliveryState.class), anyBoolean());
// until the transfer is settled
downstreamDelivery.complete(mock(ProtonDelivery.class));
verify(delivery).disposition(any(Accepted.class), eq(true));
// and has reported the telemetry message
verify(metrics).reportTelemetry(
eq(EndpointType.TELEMETRY),
eq(TEST_TENANT_ID),
eq(tenantObject),
eq(ProcessingOutcome.FORWARDED),
eq(QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
}
@Override
public Future<Void> close() {
Promise<Void> promise = Promise.promise();
closeHandler.close(promise);
return promise.future();
}
/**
* Creates a resource using an HTTP POST request.
*
* @param requestOptions The options to use for the request.
* @param body The body to post (may be {@code null}).
* @param requestHeaders The headers to include in the request (may be {@code null}).
* @param successPredicate A predicate on the HTTP response for determining success.
* @param checkCorsHeaders Whether to set and check CORS headers.
* @return A future that will succeed if the predicate evaluates to {@code true}.
* @throws NullPointerException if options or predicate are {@code null}.
*/
public Future<MultiMap> create(
final RequestOptions requestOptions,
final Buffer body,
final MultiMap requestHeaders,
final Predicate<HttpClientResponse> successPredicate,
final boolean checkCorsHeaders) {
Objects.requireNonNull(requestOptions);
Objects.requireNonNull(successPredicate);
final Promise<MultiMap> result = Promise.promise();
context.runOnContext(go -> {
@SuppressWarnings("deprecation")
final HttpClientRequest req = client.post(requestOptions)
.handler(response -> {
LOGGER.trace("response status code {}", response.statusCode());
if (successPredicate.test(response)) {
if (checkCorsHeaders) {
checkCorsHeaders(response, result);
}
result.tryComplete(response.headers());
} else {
result.tryFail(newUnexpectedResponseStatusException(response.statusCode()));
}
}).exceptionHandler(result::tryFail);
if (requestHeaders != null) {
req.headers().addAll(requestHeaders);
}
if (checkCorsHeaders) {
req.headers().add(HttpHeaders.ORIGIN, ORIGIN_URI);
}
if (body == null) {
req.end();
} else {
req.end(body);
}
});
return result.future();
}
@Override
public Future<Boolean> isConnectionLimitReached(final TenantObject tenant, final SpanContext spanContext) {
Objects.requireNonNull(tenant);
final Span span = createSpan("verify connection limit", spanContext, tenant);
final Map<String, Object> items = new HashMap<>();
final Promise<Boolean> result = Promise.promise();
if (tenant.getResourceLimits() == null) {
items.put(Fields.EVENT, "no resource limits configured");
LOG.trace("no resource limits configured for tenant [{}]", tenant.getTenantId());
result.complete(Boolean.FALSE);
} else {
final long maxConnections = tenant.getResourceLimits().getMaxConnections();
items.put(TenantConstants.FIELD_MAX_CONNECTIONS, maxConnections);
LOG.trace("connection limit for tenant [{}] is [{}]", tenant.getTenantId(), maxConnections);
if (maxConnections == -1) {
items.put(Fields.EVENT, "no connection limit configured");
result.complete(Boolean.FALSE);
} else {
final String queryParams = String.format("sum(%s{tenant=\"%s\"})", CONNECTIONS_METRIC_NAME,
tenant.getTenantId());
executeQuery(queryParams, span)
.map(currentConnections -> {
items.put("current-connections", currentConnections);
final boolean isExceeded = currentConnections >= maxConnections;
LOG.trace("connection limit {}exceeded [tenant: {}, current connections: {}, max-connections: {}]",
isExceeded ? "" : "not ", tenant.getTenantId(), currentConnections, maxConnections);
return isExceeded;
})
.otherwise(failure -> Boolean.FALSE)
.onComplete(result);
}
}
return result.future().map(b -> {
items.put("limit exceeded", b);
span.log(items);
span.finish();
return b;
});
}
@Override
public Future<Long> endOffsets(TopicPartition topicPartition) {
Promise<Long> promise = Promise.promise();
endOffsets(topicPartition, promise);
return promise.future();
}
/**
* Reconciles Acl rules for given user
*
* @param username User name of the reconciled user. When using TLS client auth, the username should be already in the Kafka format, e.g. CN=my-user
* @param desired The list of desired Acl rules
* @return the Future with reconcile result
*/
public Future<ReconcileResult<Set<SimpleAclRule>>> reconcile(String username, Set<SimpleAclRule> desired) {
Promise<ReconcileResult<Set<SimpleAclRule>>> promise = Promise.promise();
vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
future -> {
Set<SimpleAclRule> current;
try {
current = getAcls(username);
} catch (Exception e) {
// if authorization is not enabled in the Kafka resource, but the KafkaUser resource doesn't
// have ACLs, the UO can just ignore the corresponding exception
if (e instanceof InvalidResourceException && (desired == null || desired.isEmpty())) {
future.complete();
return;
} else {
log.error("Reconciliation failed for user {}", username, e);
future.fail(e);
return;
}
}
if (desired == null || desired.isEmpty()) {
if (current.size() == 0) {
log.debug("User {}: No expected Acl rules and no existing Acl rules -> NoOp", username);
future.complete(ReconcileResult.noop(desired));
} else {
log.debug("User {}: No expected Acl rules, but {} existing Acl rules -> Deleting rules", username, current.size());
internalDelete(username, current).onComplete(future);
}
} else {
if (current.isEmpty()) {
log.debug("User {}: {} expected Acl rules, but no existing Acl rules -> Adding rules", username, desired.size());
internalCreate(username, desired).onComplete(future);
} else {
log.debug("User {}: {} expected Acl rules and {} existing Acl rules -> Reconciling rules", username, desired.size(), current.size());
internalUpdate(username, desired, current).onComplete(future);
}
}
},
false,
promise
);
return promise.future();
}
void sendPreLoginMessage(boolean ssl, Handler<AsyncResult<Void>> completionHandler) {
PreLoginCommand cmd = new PreLoginCommand(ssl);
Promise<Void> promise = Promise.promise();
promise.future().onComplete(completionHandler);
schedule(cmd, promise);
}
RestartContext(Supplier<BackOff> backOffSupplier) {
promise = Promise.promise();
backOff = backOffSupplier.get();
backOff.delayMs();
}