下面列出了java.util.concurrent.CompletableFuture#join ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws InterruptedException {
// Creates a default async client with credentials and regions loaded from the environment
DynamoDbAsyncClient client = DynamoDbAsyncClient.create();
CompletableFuture<ListTablesResponse> response = client.listTables(ListTablesRequest.builder()
.build());
// Map the response to another CompletableFuture containing just the table names
CompletableFuture<List<String>> tableNames = response.thenApply(ListTablesResponse::tableNames);
// When future is complete (either successfully or in error) handle the response
tableNames.whenComplete((tables, err) -> {
try {
if (tables != null) {
tables.forEach(System.out::println);
} else {
// Handle error
err.printStackTrace();
}
} finally {
// Lets the application shut down. Only close the client when you are completely done with it.
client.close();
}
});
tableNames.join();
}
@Test
void shouldReplaceFingerprint() throws Exception {
HttpClientRequest req = fooClient.get(httpServer.actualPort(), "localhost", "/upcheck");
CompletableFuture<HttpClientResponse> respFuture = new CompletableFuture<>();
req.handler(respFuture::complete).exceptionHandler(respFuture::completeExceptionally).end();
HttpClientResponse resp = respFuture.join();
assertEquals(200, resp.statusCode());
List<String> knownClients = Files.readAllLines(knownClientsFile);
assertEquals(3, knownClients.size(), String.join("\n", knownClients));
assertEquals("#First line", knownClients.get(0));
assertEquals("foobar.com " + DUMMY_FINGERPRINT, knownClients.get(1));
assertEquals("foo.com " + fooFingerprint, knownClients.get(2));
req = foobarClient.get(httpServer.actualPort(), "localhost", "/upcheck");
respFuture = new CompletableFuture<>();
req.handler(respFuture::complete).exceptionHandler(respFuture::completeExceptionally).end();
resp = respFuture.join();
assertEquals(200, resp.statusCode());
knownClients = Files.readAllLines(knownClientsFile);
assertEquals(3, knownClients.size(), String.join("\n", knownClients));
assertEquals("#First line", knownClients.get(0));
assertEquals("foobar.com " + foobarFingerprint, knownClients.get(1));
assertEquals("foo.com " + fooFingerprint, knownClients.get(2));
}
/**
* Checks if the given object is readable to guest user.
* @param objId MCRObjectID as String
*/
public static boolean isWorldReadable(String objId) {
if (objId == null || !MCRObjectID.isValid(objId)) {
return false;
}
MCRObjectID mcrObjectID = MCRObjectID.getInstance(objId);
CompletableFuture<Boolean> permission = MCRAccessManager.checkPermission(
MCRSystemUserInformation.getGuestInstance(),
() -> MCRAccessManager.checkPermission(mcrObjectID, MCRAccessManager.PERMISSION_READ));
try {
return permission.join();
} catch (CancellationException | CompletionException e) {
LOGGER.error("Error while retriving ACL information for Object {}", objId, e);
return false;
}
}
@Test
void shouldValidateOnFirstUse() throws Exception {
HttpClientRequest req = fooClient.get(httpServer.actualPort(), "localhost", "/upcheck");
CompletableFuture<HttpClientResponse> respFuture = new CompletableFuture<>();
req.handler(respFuture::complete).exceptionHandler(respFuture::completeExceptionally).end();
HttpClientResponse resp = respFuture.join();
assertEquals(200, resp.statusCode());
List<String> knownClients = Files.readAllLines(knownClientsFile);
assertEquals(3, knownClients.size());
assertEquals("#First line", knownClients.get(0));
assertEquals("foobar.com " + DUMMY_FINGERPRINT, knownClients.get(1));
assertEquals("foo.com " + fooFingerprint, knownClients.get(2));
}
@Test
void multiple_SetToListWithSameTypeMappingDuplicates_ReturnsFutureOfListOfMappedValuesWithDuplicates() {
final Set<String> set = new HashSet<>();
set.add("foo");
set.add("bar");
final CompletableFuture<List<String>> future = mapValuesToFutureOfCompletedValues(set,
element -> completedFuture("constantResult"), toList());
final List<String> result = future.join();
assertThat(result).containsExactly("constantResult", "constantResult");
assertThat(result).isExactlyInstanceOf(ArrayList.class);
}
@Test
void shouldValidateUsingCertificate() {
HttpClientRequest req = caClient.get(httpServer.actualPort(), "localhost", "/upcheck");
CompletableFuture<HttpClientResponse> respFuture = new CompletableFuture<>();
req.handler(respFuture::complete).exceptionHandler(respFuture::completeExceptionally).end();
HttpClientResponse resp = respFuture.join();
assertEquals(200, resp.statusCode());
}
@Test
public void abortTransaction() {
MockConnectionFactory factory = new MockConnectionFactory();
SegmentHelper helper = new SegmentHelper(factory, new MockHostControllerStore());
CompletableFuture<Controller.TxnStatus> retVal = helper.abortTransaction("", "", 0L, new UUID(0, 0L),
"");
long requestId = ((MockConnection) (factory.connection)).getRequestId();
factory.rp.process(new WireCommands.AuthTokenCheckFailed(requestId, "SomeException"));
AssertExtensions.assertThrows("",
() -> retVal.join(),
ex -> ex instanceof WireCommandFailedException
&& ex.getCause() instanceof AuthenticationException
);
CompletableFuture<Controller.TxnStatus> result = helper.abortTransaction("", "", 1L, new UUID(0L, 0L), "");
requestId = ((MockConnection) (factory.connection)).getRequestId();
factory.rp.process(new WireCommands.SegmentDeleted(requestId, getQualifiedStreamSegmentName("", "", 0L)));
result.join();
result = helper.abortTransaction("", "", 1L, new UUID(0L, 0L), "");
requestId = ((MockConnection) (factory.connection)).getRequestId();
factory.rp.process(new WireCommands.NoSuchSegment(requestId, getQualifiedStreamSegmentName("", "", 0L), "", 0L));
result.join();
Supplier<CompletableFuture<?>> futureSupplier = () -> helper.abortTransaction("", "", 0L, new UUID(0, 0L),
"");
validateProcessingFailureCFE(factory, futureSupplier);
testConnectionFailure(factory, futureSupplier);
}
@Test
public void happy_path_feature_flag_off() {
@SuppressWarnings("unchecked")
RequestInfo<Void> requestInfo = mock(RequestInfo.class);
when(requestInfo.getHeaders()).thenReturn(mock(HttpHeaders.class));
when(requestInfo.getHeaders().get("upmid")).thenReturn(upmId);
when(requestInfo.getHeaders().get("id")).thenReturn("99");
String responseString = getFile("badNonNikeShoesList.json");
CompletableFuture<ResponseInfo<String>> responseFuture = underTest.execute(requestInfo, executor, mockContext);
ResponseInfo<String> responseInfo = responseFuture.join();
assertThat(responseInfo.getHttpStatusCode()).isEqualTo(200);
assertThat(responseInfo.getContentForFullResponse()).isEqualTo(responseString);
}
@Test
public void completesTerminationFutureExceptionallyIfActorFails() throws Exception {
final ActorSystem actorSystem = actorSystemResource.getActorSystem();
final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem, actorSystem.getDispatcher());
final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActor(supervisor, "foobar");
final CompletableFuture<Void> terminationFuture = actorRegistration.getTerminationFuture();
assertThat(terminationFuture.isDone(), is(false));
final CompletableFuture<Terminated> actorSystemTerminationFuture = actorSystem.getWhenTerminated().toCompletableFuture();
final FlinkException cause = new FlinkException("Test cause.");
actorRegistration.getActorRef().tell(Fail.exceptionally(cause), ActorRef.noSender());
try {
terminationFuture.get();
fail("Expected the termination future being completed exceptionally");
} catch (ExecutionException expected) {
ExceptionUtils.findThrowable(expected, e -> e.equals(cause))
.orElseThrow(() -> new FlinkException("Unexpected exception", expected));
}
// make sure that the supervisor actor has stopped --> terminating the actor system
actorSystemTerminationFuture.join();
}
private Exception perform(final CompletableFuture<ClientHttpResponse> future) {
try {
future.join();
throw new AssertionError("Expected exception");
} catch (final Exception e) {
return e;
}
}
@Test
void single_ListToListWithDiffTypeMapping_ReturnsFutureOfListOfMappedValue() {
final CompletableFuture<List<Integer>> future = mapValuesToFutureOfCompletedValues(
singletonList("foo"), element -> completedFuture(element.length()), toList());
final List<Integer> result = future.join();
assertThat(result).containsExactly(3);
assertThat(result).isExactlyInstanceOf(ArrayList.class);
}
@Benchmark
@BenchmarkMode({ Mode.Throughput })
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Object completableFutureWithTimeout() {
CompletableFuture<Integer> future = new CompletableFuture<>();
future.orTimeout(1, TimeUnit.SECONDS);
future.complete(Integer.valueOf(1));
return future.join();
}
private static <T> void unwrap(final CompletableFuture<T> future) throws Throwable {
try {
future.join();
} catch (final CompletionException e) {
throw e.getCause();
}
}
@Test
public void increaseTimeoutInRequestOverrideConfig_shouldTakePrecedence() {
stubFor(post(anyUrl())
.willReturn(aResponse().withStatus(200).withBody("{}").withFixedDelay(DELAY_AFTER_TIMEOUT)));
CompletableFuture<AllTypesResponse> allTypesResponseCompletableFuture =
client.allTypes(b -> b.overrideConfiguration(c -> c.apiCallTimeout(Duration.ofMillis(DELAY_AFTER_TIMEOUT + 1000))));
AllTypesResponse response = allTypesResponseCompletableFuture.join();
assertThat(response).isNotNull();
}
@Test(timeout = 30000)
public void concurrentCreateStreamTest() {
TaskMetadataStore taskMetadataStore = spy(TaskStoreFactory.createZKStore(zkClient, executor));
StreamMetadataTasks metadataTask = new StreamMetadataTasks(streamStorePartialMock, bucketStore, taskMetadataStore,
SegmentHelperMock.getSegmentHelperMock(), executor, "host",
new GrpcAuthHelper(authEnabled, "key", 300), requestTracker);
final ScalingPolicy policy = ScalingPolicy.fixed(2);
String stream = "concurrent";
final StreamConfiguration config = StreamConfiguration.builder().scalingPolicy(policy).build();
CompletableFuture<Void> createStreamCalled = new CompletableFuture<>();
CompletableFuture<Void> waitOnCreateStream = new CompletableFuture<>();
doAnswer(x -> {
createStreamCalled.complete(null);
waitOnCreateStream.join();
return x.callRealMethod();
}).when(streamStorePartialMock).createStream(anyString(), anyString(), any(), anyLong(), any(), any());
CompletableFuture<Controller.CreateStreamStatus.Status> createStreamFuture1 = metadataTask.createStreamRetryOnLockFailure(
SCOPE, stream, config, System.currentTimeMillis(), 10);
// wait until create stream is called. let create stream be blocked on `wait` future.
createStreamCalled.join();
// start a new create stream with 1 retries. this should throw lock failed exception
// second request should fail with LockFailedException as we have not asked for a retry.
AssertExtensions.assertFutureThrows("Lock Failed Exception should be thrown",
metadataTask.createStreamRetryOnLockFailure(SCOPE, stream, config, System.currentTimeMillis(), 1),
e -> Exceptions.unwrap(e) instanceof LockFailedException);
CompletableFuture<Void> signalLockFailed = new CompletableFuture<>();
CompletableFuture<Void> waitOnLockFailed = new CompletableFuture<>();
// first time lock failed exception is thrown, we will complete `signalLockFailed` to indicate lock failed exception is
// being thrown.
// For all subsequent times we will wait on waitOnLockFailed future.
doAnswer(x -> {
CompletableFuture<Void> future = (CompletableFuture<Void>) x.callRealMethod();
return future.exceptionally(e -> {
if (Exceptions.unwrap(e) instanceof LockFailedException) {
if (!signalLockFailed.isDone()) {
signalLockFailed.complete(null);
} else {
waitOnLockFailed.join();
}
}
throw new CompletionException(e);
});
}).when(taskMetadataStore).lock(any(), any(), anyString(), anyString(), any(), any());
// start a new create stream with retries.
CompletableFuture<Controller.CreateStreamStatus.Status> createStreamFuture2 =
metadataTask.createStreamRetryOnLockFailure(SCOPE, stream, config, System.currentTimeMillis(), 10);
// wait until lock failed exception is thrown
signalLockFailed.join();
// now complete first createStream request
waitOnCreateStream.complete(null);
assertEquals(createStreamFuture1.join(), Controller.CreateStreamStatus.Status.SUCCESS);
// now let the lock failed exception be thrown for second request for subsequent retries
waitOnLockFailed.complete(null);
// second request should also succeed now but with stream exists
assertEquals(createStreamFuture2.join(), Controller.CreateStreamStatus.Status.STREAM_EXISTS);
}
public ResolverLocation(@Nullable KeySpacePath path, @Nullable CompletableFuture<ResolvedKeySpacePath> resolvedFuture) {
if (resolvedFuture != null && resolvedFuture.isDone() && !resolvedFuture.isCompletedExceptionally()) {
this.resolvedKeySpacePath = resolvedFuture.join();
}
this.path = path;
}
@BeforeEach
void setUp() throws Exception {
final int port1 = InstanceSpec.getRandomPort();
final int zkQuorumPort1 = InstanceSpec.getRandomPort();
final int zkElectionPort1 = InstanceSpec.getRandomPort();
final int zkClientPort1 = InstanceSpec.getRandomPort();
final int port2 = InstanceSpec.getRandomPort();
final int zkQuorumPort2 = InstanceSpec.getRandomPort();
final int zkElectionPort2 = InstanceSpec.getRandomPort();
final int zkClientPort2 = InstanceSpec.getRandomPort();
final Map<Integer, ZooKeeperAddress> servers = ImmutableMap.of(
1, new ZooKeeperAddress("127.0.0.1", zkQuorumPort1, zkElectionPort1, zkClientPort1),
2, new ZooKeeperAddress("127.0.0.1", zkQuorumPort2, zkElectionPort2, zkClientPort2));
final AuthProviderFactory factory = new TestAuthProviderFactory();
replica1 = new CentralDogmaBuilder(tempDir.newFolder().toFile())
.port(port1, SessionProtocol.HTTP)
.authProviderFactory(factory)
.webAppEnabled(true)
.mirroringEnabled(false)
.gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0))
.replication(new ZooKeeperReplicationConfig(1, servers))
.build();
replica2 = new CentralDogmaBuilder(tempDir.newFolder().toFile())
.port(port2, SessionProtocol.HTTP)
.authProviderFactory(factory)
.webAppEnabled(true)
.mirroringEnabled(false)
.gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0))
.replication(new ZooKeeperReplicationConfig(2, servers))
.build();
client1 = WebClient.of("http://127.0.0.1:" + port1);
client2 = WebClient.of("http://127.0.0.1:" + port2);
final CompletableFuture<Void> f1 = replica1.start();
final CompletableFuture<Void> f2 = replica2.start();
f1.join();
f2.join();
curator = CuratorFrameworkFactory.newClient("127.0.0.1:" + zkClientPort1,
new RetryUntilElapsed(10000, 100));
curator.start();
assertThat(curator.blockUntilConnected(10, TimeUnit.SECONDS)).isTrue();
}
public static <T> T await(CompletableFuture<T> future) {
return future.join();
}
static void executeTest() throws Throwable {
final Throwable throwable[] = new Throwable[1];
final ConcurrentHashMap<Integer, Integer> m = new ConcurrentHashMap<>();
// Number of workers equal to the number of processors
// Each worker will put globally unique keys into the map
final int nWorkers = Runtime.getRuntime().availableProcessors();
final int sizePerWorker = 1024;
final int maxSize = nWorkers * sizePerWorker;
// The foreman keeps checking that the size of the arrays
// obtained from the key and value sets is never less than the
// previously observed size and is never greater than the maximum size
// NOTE: these size constraints are not specific to toArray and are
// applicable to any form of traversal of the collection views
CompletableFuture<?> foreman = CompletableFuture.runAsync(new Runnable() {
private int prevSize = 0;
private boolean checkProgress(Object[] a) {
int size = a.length;
if (size < prevSize) throw new RuntimeException("WRONG WAY");
if (size > maxSize) throw new RuntimeException("OVERSHOOT");
if (size == maxSize) return true;
prevSize = size;
return false;
}
@Override
public void run() {
try {
Integer[] empty = new Integer[0];
while (true) {
if (checkProgress(m.values().toArray())) return;
if (checkProgress(m.keySet().toArray())) return;
if (checkProgress(m.values().toArray(empty))) return;
if (checkProgress(m.keySet().toArray(empty))) return;
}
}
catch (Throwable t) {
throwable[0] = t;
}
}
});
// Create workers
// Each worker will put globally unique keys into the map
CompletableFuture<?>[] workers = IntStream.range(0, nWorkers).
mapToObj(w -> CompletableFuture.runAsync(() -> {
for (int i = 0, o = w * sizePerWorker; i < sizePerWorker; i++)
m.put(o + i, i);
})).
toArray(CompletableFuture<?>[]::new);
// Wait for workers and then foreman to complete
CompletableFuture.allOf(workers).join();
foreman.join();
if (throwable[0] != null)
throw throwable[0];
}
static void executeTest() throws Throwable {
final Throwable throwable[] = new Throwable[1];
final ConcurrentHashMap<Integer, Integer> m = new ConcurrentHashMap<>();
// Number of workers equal to the number of processors
// Each worker will put globally unique keys into the map
final int nWorkers = Runtime.getRuntime().availableProcessors();
final int sizePerWorker = 1024;
final int maxSize = nWorkers * sizePerWorker;
// The foreman keeps checking that the size of the arrays
// obtained from the key and value sets is never less than the
// previously observed size and is never greater than the maximum size
// NOTE: these size constraints are not specific to toArray and are
// applicable to any form of traversal of the collection views
CompletableFuture<?> foreman = CompletableFuture.runAsync(new Runnable() {
private int prevSize = 0;
private boolean checkProgress(Object[] a) {
int size = a.length;
if (size < prevSize) throw new RuntimeException("WRONG WAY");
if (size > maxSize) throw new RuntimeException("OVERSHOOT");
if (size == maxSize) return true;
prevSize = size;
return false;
}
@Override
public void run() {
try {
Integer[] empty = new Integer[0];
while (true) {
if (checkProgress(m.values().toArray())) return;
if (checkProgress(m.keySet().toArray())) return;
if (checkProgress(m.values().toArray(empty))) return;
if (checkProgress(m.keySet().toArray(empty))) return;
}
}
catch (Throwable t) {
throwable[0] = t;
}
}
});
// Create workers
// Each worker will put globally unique keys into the map
CompletableFuture<?>[] workers = IntStream.range(0, nWorkers).
mapToObj(w -> CompletableFuture.runAsync(() -> {
for (int i = 0, o = w * sizePerWorker; i < sizePerWorker; i++)
m.put(o + i, i);
})).
toArray(CompletableFuture<?>[]::new);
// Wait for workers and then foreman to complete
CompletableFuture.allOf(workers).join();
foreman.join();
if (throwable[0] != null)
throw throwable[0];
}