下面列出了java.util.concurrent.CompletableFuture#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void updateRemainingValidationResults(CompletableFuture<Void> future,
ValidationResult.ValidationType validationType,
List<PartitionValidationResults> partitionsValidationResultList) throws Exception {
String error = "No Server Endpoint found.";
try {
future.get(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException exception) {
error = exception.getMessage();
}
failMissingValidationResults(
partitionsValidationResultList,
validationType,
error
);
}
public BigInteger requestNonce(final JsonRpcRequest request) {
final CompletableFuture<BigInteger> result = getNonceFromWeb3Provider(request, headers);
try {
final BigInteger nonce = result.get();
LOG.debug("Supplying nonce of {}", nonce.toString());
return nonce;
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to retrieve nonce:" + e.getMessage(), e.getCause());
}
}
/**
* Test request for new workers.
*/
@Test
public void testRequestNewWorkers() throws Exception {
new Context() {{
startResourceManager();
// allocate a worker
when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
CompletableFuture<Void> allocateResourceFuture = resourceManager.callAsync(
() -> {
rmServices.rmActions.allocateResource(resourceProfile1);
return null;
},
timeout);
// check for exceptions
allocateResourceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
// verify that a new worker was persisted, the internal state was updated, the task router was notified,
// and the launch coordinator was asked to launch a task
MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(task1, resourceProfile1);
verify(rmServices.workerStore, Mockito.timeout(timeout.toMilliseconds())).putWorker(expected);
assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(task1), expected));
resourceManager.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
}};
}
/**
* Gets the result of a completable future without any exception thrown.
*
* @param future the completable future specified.
* @param <T> the type of result
* @return the result of completable future,
* or null if it's unfinished or finished exceptionally
*/
@Nullable
public static <T> T getWithoutException(CompletableFuture<T> future) {
if (future.isDone() && !future.isCompletedExceptionally()) {
try {
return future.get();
} catch (InterruptedException | ExecutionException ignored) {
}
}
return null;
}
@Test
public void test() throws InterruptedException, ExecutionException {
final CountDownLatch latch = new CountDownLatch(1);
final ClientBuilder builder = Client.builder().endpoints(cluster.getClientEndpoints())
.header("MyHeader1", "MyHeaderVal1").header("MyHeader2", "MyHeaderVal2").interceptor(new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(responseListener, headers);
assertThat(headers.get(Metadata.Key.of("MyHeader1", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal1");
assertThat(headers.get(Metadata.Key.of("MyHeader2", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal2");
latch.countDown();
}
};
}
});
try (Client client = builder.build()) {
CompletableFuture<PutResponse> future = client.getKVClient().put(bytesOf("sample_key"), bytesOf("sample_key"));
latch.await(1, TimeUnit.MINUTES);
future.get();
}
}
@Test public void testDocumentLink_02() throws Exception, ExecutionException {
LanguageServer languageServer = wrap(LanguageServer.class, new MockLanguageServer() {
@Override
public CompletableFuture<List<DocumentLink>> documentLink(DocumentLinkParams params) {
return CompletableFutures.computeAsync(canceler -> {
return null;
});
}
});
CompletableFuture<List<DocumentLink>> future = languageServer.getTextDocumentService().documentLink(new DocumentLinkParams(new TextDocumentIdentifier("test")));
List<DocumentLink> list = future.get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertNull(list);
}
private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) {
try
{
futureFirst.get();
fail("Should have timed out");
}
catch (Exception ex)
{
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms"));
}
}
/**
* Tests that a bad handler request (HandlerRequest cannot be created) is reported as a BAD_REQUEST
* and not an internal server error.
*
* <p>See FLINK-7663
*/
@Test
public void testBadHandlerRequest() throws Exception {
final FaultyTestParameters parameters = new FaultyTestParameters();
parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
((TestParameters) parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
CompletableFuture<TestResponse> response = restClient.sendRequest(
serverAddress.getHostName(),
serverAddress.getPort(),
new TestHeaders(),
parameters,
new TestRequest(2));
try {
response.get();
fail("The request should fail with a bad request return code.");
} catch (ExecutionException ee) {
Throwable t = ExceptionUtils.stripExecutionException(ee);
assertTrue(t instanceof RestClientException);
RestClientException rce = (RestClientException) t;
assertEquals(HttpResponseStatus.BAD_REQUEST, rce.getHttpResponseStatus());
}
}
@Override
protected void run() throws Exception {
ExecutorService executor = newSingleThreadExecutor();
CompletableFuture<Void> future = eventloop.submit(() ->
ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))
.streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))
.whenResult(() -> System.out.printf("\nFile '%s' successfully downloaded to '%s'\n\n",
REQUIRED_FILE, clientStorage))
);
future.get();
executor.shutdown();
}
@Override
public Boolean call() throws Exception {
try {
boolean isContinue = checkFirstBlock();
if (!isContinue) {
Log.info("dowload done 1.");
return true;
}
} catch (NulsRuntimeException e) {
Log.error(e);
return false;
}
List<Node> nodes = newestInfos.getNodes();
long netBestHeight = newestInfos.getNetBestHeight();
long localBestHeight = blockService.getBestBlock().getData().getHeader().getHeight();
RequestThread requestThread = new RequestThread(nodes, localBestHeight + 1, netBestHeight);
CompletableFuture<Boolean> future = new CompletableFuture<>();
CollectThread collectThread = CollectThread.getInstance();
collectThread.setConfiguration(localBestHeight + 1, netBestHeight, requestThread, future);
TaskManager.createAndRunThread(ProtocolConstant.MODULE_ID_PROTOCOL, "download-collect", collectThread);
TaskManager.createAndRunThread(ProtocolConstant.MODULE_ID_PROTOCOL, "download-request", requestThread);
boolean result = future.get();
// Log.info(Thread.currentThread().getId() + "-" + collectThread.getStartHeight() + "-" + collectThread.getRequestStartHeight() + "::::" + result);
return result;
}
private static int waitAndComputeTotalLineCountResult(List<CompletableFuture<Integer>> requests) {
int count = 0;
for (CompletableFuture<Integer> result : requests) {
try {
count += result.get();
} catch (Throwable e) {
System.out.println("Failed count lines");
e.printStackTrace();
}
}
return count;
}
private void testNormalConsumerLifecycle(FlinkKafkaConsumerBase<String> testKafkaConsumer) throws Exception {
setupConsumer(testKafkaConsumer);
final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
testKafkaConsumer.close();
runFuture.get();
}
/**
* Tests that the a JM connects to the leading RM after regaining leadership.
*/
@Test
public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
final JobMaster jobMaster = createJobMaster(
configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build(),
heartbeatServices);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
try {
// wait for the start to complete
startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway();
final BlockingQueue<JobMasterId> registrationQueue = new ArrayBlockingQueue<>(1);
testingResourceManagerGateway.setRegisterJobManagerFunction((jobMasterId, resourceID, s, jobID) -> {
registrationQueue.offer(jobMasterId);
return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
});
notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
final JobMasterId firstRegistrationAttempt = registrationQueue.take();
assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
jobMaster.suspend(new FlinkException("Test exception.")).get();
final JobMasterId jobMasterId2 = JobMasterId.generate();
jobMaster.start(jobMasterId2).get();
final JobMasterId secondRegistrationAttempt = registrationQueue.take();
assertThat(secondRegistrationAttempt, equalTo(jobMasterId2));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
}
/**
* Tests that a job is removed from the JobLeaderService once a TaskExecutor has
* no more slots assigned to this job.
*
* <p>See FLINK-8504
*/
@Test
public void testRemoveJobFromJobLeaderService() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(
Collections.singleton(ResourceProfile.UNKNOWN),
timerService);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();
final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
try {
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
final CompletableFuture<Void> initialSlotReport = new CompletableFuture<>();
resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
initialSlotReport.complete(null);
return CompletableFuture.completedFuture(Acknowledge.get());
});
final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>();
final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
startFuture,
stopFuture);
haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);
taskExecutor.start();
taskExecutor.waitUntilStarted();
final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);
final AllocationID allocationId = new AllocationID();
assertThat(startFuture.isDone(), is(false));
final JobLeaderService jobLeaderService = taskManagerServices.getJobLeaderService();
assertThat(jobLeaderService.containsJob(jobId), is(false));
// wait for the initial slot report
initialSlotReport.get();
taskExecutorGateway.requestSlot(
slotId,
jobId,
allocationId,
"foobar",
resourceManagerId,
timeout).get();
// wait until the job leader retrieval service for jobId is started
startFuture.get();
assertThat(jobLeaderService.containsJob(jobId), is(true));
taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get();
// wait that the job leader retrieval service for jobId stopped becaue it should get removed
stopFuture.get();
assertThat(jobLeaderService.containsJob(jobId), is(false));
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
/**
* Test delayed registration of task executor where the delay is introduced during connection from resource manager
* to the registering task executor.
*/
@Test
public void testDelayedRegisterTaskExecutor() throws Exception {
final Time fastTimeout = Time.milliseconds(1L);
try {
final OneShotLatch startConnection = new OneShotLatch();
final OneShotLatch finishConnection = new OneShotLatch();
// first registration is with blocking connection
rpcService.setRpcGatewayFutureFunction(rpcGateway ->
CompletableFuture.supplyAsync(
() -> {
startConnection.trigger();
try {
finishConnection.await();
} catch (InterruptedException ignored) {}
return rpcGateway;
},
TestingUtils.defaultExecutor()));
TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
taskExecutorGateway.getAddress(),
taskExecutorResourceID,
dataPort,
hardwareDescription,
ResourceProfile.ZERO,
ResourceProfile.ZERO);
CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
try {
firstFuture.get();
fail("Should have failed because connection to taskmanager is delayed beyond timeout");
} catch (Exception e) {
final Throwable cause = ExceptionUtils.stripExecutionException(e);
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor"));
}
startConnection.await();
// second registration after timeout is with no delay, expecting it to be succeeded
rpcService.resetRpcGatewayFutureFunction();
CompletableFuture<RegistrationResponse> secondFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
RegistrationResponse response = secondFuture.get();
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
// on success, send slot report for taskmanager registration
final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.ANY));
rmGateway.sendSlotReport(taskExecutorResourceID,
((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
// let the remaining part of the first registration proceed
finishConnection.trigger();
Thread.sleep(1L);
// verify that the latest registration is valid not being unregistered by the delayed one
final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
taskExecutorResourceID,
TIMEOUT).get();
assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
} finally {
rpcService.resetRpcGatewayFutureFunction();
}
}
/**
* Tests the updateGlobalAggregate functionality.
*/
@Test
public void testJobMasterAggregatesValuesCorrectly() throws Exception {
final JobMaster jobMaster = createJobMaster(
configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build(),
heartbeatServices);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId);
final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
try {
// wait for the start to complete
startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
CompletableFuture<Object> updateAggregateFuture;
AggregateFunction<Integer, Integer, Integer> aggregateFunction = createAggregateFunction();
ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(1));
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 2, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(3));
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 3, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(6));
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 4, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(10));
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", 10, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(10));
updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", 23, serializedAggregateFunction);
assertThat(updateAggregateFuture.get(), equalTo(33));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
}
private static Policy createPolicy() throws InterruptedException, ExecutionException, TimeoutException {
final Policy policy = PoliciesModelFactory.newPolicy(policyId, getDefaultPolicyEntry());
final CompletableFuture<Policy> createPromise = dittoClient.policies().create(policy);
return createPromise.get(CLIENT_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
}
/**
* Tests that the TaskManager fails the task if the partition update fails.
*/
@Test
public void testUpdateTaskInputPartitionsFailure() throws Exception {
final ExecutionAttemptID eid = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd = createTestTaskDeploymentDescriptor("test task", eid, BlockingNoOpInvokable.class);
final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
final ShuffleEnvironment<?, ?> shuffleEnvironment = mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS);
try (TaskSubmissionTestEnvironment env =
new TaskSubmissionTestEnvironment.Builder(jobId)
.setShuffleEnvironment(shuffleEnvironment)
.setSlotSize(1)
.addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture)
.addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
.build()) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
TaskSlotTable taskSlotTable = env.getTaskSlotTable();
taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
taskRunningFuture.get();
final ResourceID producerLocation = env.getTaskExecutor().getResourceID();
NettyShuffleDescriptor shuffleDescriptor =
createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation);
final PartitionInfo partitionUpdate = new PartitionInfo(new IntermediateDataSetID(), shuffleDescriptor);
doThrow(new IOException()).when(shuffleEnvironment).updatePartitionInfo(eid, partitionUpdate);
final CompletableFuture<Acknowledge> updateFuture = tmGateway.updatePartitions(
eid,
Collections.singletonList(partitionUpdate),
timeout);
updateFuture.get();
taskFailedFuture.get();
Task task = taskSlotTable.getTask(tdd.getExecutionAttemptId());
assertThat(task.getExecutionState(), is(ExecutionState.FAILED));
assertThat(task.getFailureCause(), instanceOf(IOException.class));
}
}
/**
* Tests that a remote rpc call with a non-serializable argument fails with an
* {@link IOException} (or an {@link java.lang.reflect.UndeclaredThrowableException} if the
* the method declaration does not include the {@link IOException} as throwable).
*/
@Test(expected = IOException.class)
public void testNonSerializableRemoteMessageTransfer() throws Exception {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
testEndpoint.start();
String address = testEndpoint.getAddress();
CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
remoteGateway.foobar(new Object());
fail("Should have failed because Object is not serializable.");
}
@Test
public void testConnectFailure() throws Exception {
RestClient restClient = null;
RestServerEndpoint serverEndpoint = null;
try {
RestServerEndpointConfiguration restServerConfig = RestServerEndpointConfiguration.fromConfiguration(serverConfig);
RestClientConfiguration restClientConfig = RestClientConfiguration.fromConfiguration(clientConfig);
RestfulGateway restfulGateway = new TestingRestfulGateway.Builder().build();
RestServerEndpointITCase.TestVersionHandler testVersionHandler = new RestServerEndpointITCase.TestVersionHandler(
() -> CompletableFuture.completedFuture(restfulGateway),
RpcUtils.INF_TIMEOUT);
serverEndpoint = new RestServerEndpointITCase.TestRestServerEndpoint(
restServerConfig,
Arrays.asList(Tuple2.of(testVersionHandler.getMessageHeaders(), testVersionHandler)));
restClient = new RestServerEndpointITCase.TestRestClient(restClientConfig);
serverEndpoint.start();
CompletableFuture<EmptyResponseBody> response = restClient.sendRequest(
serverEndpoint.getServerAddress().getHostName(),
serverEndpoint.getServerAddress().getPort(),
RestServerEndpointITCase.TestVersionHeaders.INSTANCE,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.emptyList()
);
response.get(60, TimeUnit.SECONDS);
fail("should never complete normally");
} catch (ExecutionException exception) {
// that is what we want
assertTrue(ExceptionUtils.findThrowable(exception, SSLException.class).isPresent());
} finally {
if (restClient != null) {
restClient.shutdown(timeout);
}
if (serverEndpoint != null) {
serverEndpoint.close();
}
}
}