java.util.concurrent.CompletableFuture#whenComplete ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#whenComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: pravega   文件: Futures.java
/**
 * Returns a CompletableFuture that will complete with the same outcome or result as the given source, but when
 * cancelled, will apply a consumer to the eventual result of the original future.
 * <p>
 * If the returned CompletableFuture is NOT cancelled ({@link CompletableFuture#cancel}):
 * - If source completes normally, the result CompletableFuture will complete with the same result.
 * - If source completes exceptionally, the result CompletableFuture will complete with the same result.
 * <p>
 * If the returned CompletableFuture is cancelled ({@link CompletableFuture#cancel}):
 * - If the source has already completed, the result CompletableFuture will also be completed with the same outcome.
 * - If the source has not already been completed, if it completes normally, then `onCancel` will be applied to
 * the result when it eventually completes. The source completes exceptionally, nothing will happen.
 *
 * @param source   The CompletableFuture to wrap.
 * @param onCancel A Consumer to invoke on source's eventual completion result if the result of this method is cancelled.
 * @param <T>      Result type.
 * @return A CompletableFuture that will complete with the same outcome or result as the given source.
 */
public static <T> CompletableFuture<T> cancellableFuture(CompletableFuture<T> source, Consumer<T> onCancel) {
    if (source == null) {
        return null;
    }

    val result = new CompletableFuture<T>();
    source.whenComplete((r, ex) -> {
        if (ex == null) {
            result.complete(r);
        } else {
            result.completeExceptionally(ex);
        }
    });
    Futures.exceptionListener(result, ex -> {
        if (ex instanceof CancellationException && !source.isCancelled()) {
            source.thenAccept(onCancel);
        }
    });
    return result;
}
 
源代码2 项目: joyrpc   文件: CallbackManager.java
@Override
public CompletableFuture<Result> invoke(final RequestMessage<Invocation> request) {
    // 回调的场景少,直接采用反射
    CompletableFuture<Result> result = new CompletableFuture<>();
    try {
        Invocation payLoad = request.getPayLoad();
        Method method = getPublicMethod(callbackClass, payLoad.getMethodName());
        Object value = method.invoke(callback, payLoad.getArgs());
        //异步回调
        CompletableFuture<?> future = isReturnFuture(callbackClass, method) ?
                (CompletableFuture<?>) value : CompletableFuture.completedFuture(value);
        future.whenComplete((v, t) -> {
            if (t != null) {
                result.complete(new Result(request.getContext(), t));
            } else {
                result.complete(new Result(request.getContext(), v));
            }
        });
    } catch (Throwable e) {
        result.completedFuture(new Result(request.getContext(), e));
    }
    return result;
}
 
源代码3 项目: joyrpc   文件: AbstractTraceFilter.java
@Override
public CompletableFuture<Result> invoke(final Invoker invoker, final RequestMessage<Invocation> request) {
    if (factory == null) {
        return invoker.invoke(request);
    }
    Invocation invocation = request.getPayLoad();
    InterfaceOption.MethodOption option = request.getOption();
    if (!option.isTrace()) {
        return invoker.invoke(request);
    }
    //构造跟踪标签
    Map<String, String> tags = new HashMap<>();
    createTags(request, tags);
    //创建跟踪
    Tracer trace = factory.create(request);
    //启动跟踪
    trace.begin(option.getTraceSpanId(invocation), component, tags);
    //快照
    trace.snapshot();
    //远程调用
    CompletableFuture<Result> future = invoker.invoke(request);
    //主线程调用结束
    trace.prepare();
    future.whenComplete((result, throwable) -> {
        //异步线程恢复
        trace.restore();
        //异步线程结束
        trace.end(throwable == null ? result.getException() : throwable);
    });
    return future;
}
 
源代码4 项目: flink   文件: SchedulerImpl.java
private void internalAllocateSlot(
		CompletableFuture<LogicalSlot> allocationResultFuture,
		SlotRequestId slotRequestId,
		ScheduledUnit scheduledUnit,
		SlotProfile slotProfile,
		boolean allowQueuedScheduling,
		Time allocationTimeout) {
	CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
		allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
		allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);

	allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
		if (failure != null) {
			Optional<SharedSlotOversubscribedException> sharedSlotOverAllocatedException =
					ExceptionUtils.findThrowable(failure, SharedSlotOversubscribedException.class);
			if (sharedSlotOverAllocatedException.isPresent() &&
					sharedSlotOverAllocatedException.get().canRetry()) {

				// Retry the allocation
				internalAllocateSlot(
						allocationResultFuture,
						slotRequestId,
						scheduledUnit,
						slotProfile,
						allowQueuedScheduling,
						allocationTimeout);
			} else {
				cancelSlotRequest(
						slotRequestId,
						scheduledUnit.getSlotSharingGroupId(),
						failure);
				allocationResultFuture.completeExceptionally(failure);
			}
		} else {
			allocationResultFuture.complete(slot);
		}
	});
}
 
源代码5 项目: zeppelin   文件: ClusterManagerServer.java
public void unicastClusterEvent(String host, int port, String topic, String msg) {
  LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}",
      host, port, topic, msg);

  Address address = Address.from(host, port);
  CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
      topic, msg.getBytes(), Duration.ofSeconds(2));
  response.whenComplete((r, e) -> {
    if (null == e) {
      LOGGER.error(e.getMessage(), e);
    }
  });
}
 
源代码6 项目: joyrpc   文件: ConsumerConfig.java
@Override
protected CompletableFuture<Void> doOpen() {
    CompletableFuture<Void> future = new CompletableFuture<>();
    //创建注册中心
    registryRef = REGISTRY.get(registryUrl.getProtocol()).getRegistry(registryUrl);
    //构建代理
    config.proxy();
    //订阅,等到初始化配置
    chain(subscribe(), future, (v) -> chain(waitingConfig, future, (url) -> {
        //检查动态配置是否修改了别名,需要重新订阅
        serviceUrl = url;
        registerUrl = config.register ? buildRegisteredUrl(registryRef, url) : null;
        resubscribe(buildSubscribedUrl(configureRef, url), false);
        try {
            refer = ServiceManager.refer(url, config, registryRef, registerUrl, configureRef, subscribeUrl, configHandler);
            //打开
            chain(refer.open(), future, s -> {
                //构建调用器
                invokeHandler = new ConsumerInvokeHandler(refer, proxyClass, refer.getUrl());
                future.complete(null);
            });
        } catch (Throwable ex) {
            future.completeExceptionally(ex);
        }
    }));
    future.whenComplete((v, err) -> latch.countDown());
    return future;
}
 
源代码7 项目: flink   文件: AkkaRpcService.java
@Override
public CompletableFuture<Void> stopService() {
	final CompletableFuture<Void> akkaRpcActorsTerminationFuture;

	synchronized (lock) {
		if (stopped) {
			return terminationFuture;
		}

		LOG.info("Stopping Akka RPC service.");

		stopped = true;

		akkaRpcActorsTerminationFuture = terminateAkkaRpcActors();
	}

	final CompletableFuture<Void> supervisorTerminationFuture = FutureUtils.composeAfterwards(
		akkaRpcActorsTerminationFuture,
		supervisor::closeAsync);

	final CompletableFuture<Void> actorSystemTerminationFuture = FutureUtils.composeAfterwards(
		supervisorTerminationFuture,
		() -> FutureUtils.toJava(actorSystem.terminate()));

	actorSystemTerminationFuture.whenComplete(
		(Void ignored, Throwable throwable) -> {
			if (throwable != null) {
				terminationFuture.completeExceptionally(throwable);
			} else {
				terminationFuture.complete(null);
			}

			LOG.info("Stopped Akka RPC service.");
		});

	return terminationFuture;
}
 
源代码8 项目: atomix   文件: DocumentTreeResource.java
@PUT
@Path("/{name}/{path: .*}")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
public void set(
    @PathParam("name") String name,
    @PathParam("path") List<PathSegment> path,
    String value,
    @QueryParam("version") Long version,
    @Suspended AsyncResponse response) {
  CompletableFuture<Boolean> future;
  if (version != null) {
    future = getPrimitive(name).thenCompose(tree -> tree.replace(getDocumentPath(path), value, version));
  } else {
    future = getPrimitive(name).thenCompose(tree -> tree.set(getDocumentPath(path), value).thenApply(v -> Boolean.TRUE));
  }

  future.whenComplete((result, error) -> {
    if (error == null) {
      response.resume(Response.ok(result).build());
    } else {
      if (error.getCause() != null) {
        error = error.getCause();
      }
      if (error instanceof IllegalDocumentModificationException || error instanceof NoSuchDocumentPathException) {
        response.resume(Response.ok(false).build());
      } else {
        LOGGER.warn("{}", error);
        response.resume(Response.serverError().build());
      }
    }
  });
}
 
源代码9 项目: flink   文件: CompletedOperationCache.java
/**
 * Registers an ongoing operation with the cache.
 *
 * @param operationResultFuture A future containing the operation result.
 */
public void registerOngoingOperation(
		final K operationKey,
		final CompletableFuture<R> operationResultFuture) {
	final ResultAccessTracker<R> inProgress = ResultAccessTracker.inProgress();
	registeredOperationTriggers.put(operationKey, inProgress);
	operationResultFuture.whenComplete((result, error) -> {
		if (error == null) {
			completedOperations.put(operationKey, inProgress.finishOperation(Either.Right(result)));
		} else {
			completedOperations.put(operationKey, inProgress.finishOperation(Either.Left(error)));
		}
		registeredOperationTriggers.remove(operationKey);
	});
}
 
源代码10 项目: flink   文件: FutureUtils.java
/**
 * Schedule the operation with the given delay.
 *
 * @param operation to schedule
 * @param delay delay to schedule
 * @param scheduledExecutor executor to be used for the operation
 * @param <T> type of the result
 * @return Future which schedules the given operation with given delay.
 */
public static <T> CompletableFuture<T> scheduleWithDelay(
		final Supplier<T> operation,
		final Time delay,
		final ScheduledExecutor scheduledExecutor) {
	final CompletableFuture<T> resultFuture = new CompletableFuture<>();

	ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
		() -> {
			try {
				resultFuture.complete(operation.get());
			} catch (Throwable t) {
				resultFuture.completeExceptionally(t);
			}
		},
		delay.getSize(),
		delay.getUnit()
	);

	resultFuture.whenComplete(
		(t, throwable) -> {
			if (!scheduledFuture.isDone()) {
				scheduledFuture.cancel(false);
			}
		});
	return resultFuture;
}
 
源代码11 项目: distributedlog   文件: FutureUtils.java
public static <T> void proxyTo(CompletableFuture<T> src,
                               CompletableFuture<T> target) {
    src.whenComplete((value, cause) -> {
        if (null == cause) {
            target.complete(value);
        } else {
            target.completeExceptionally(cause);
        }
    });
}
 
源代码12 项目: flink   文件: ExecutionGraph.java
public void scheduleForExecution() throws JobException {

		assertRunningInJobMasterMainThread();

		if (isLegacyScheduling()) {
			LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
		}

		final long currentGlobalModVersion = globalModVersion;

		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

			final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
				scheduleMode,
				getAllExecutionVertices(),
				this);

			if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
				schedulingFuture = newSchedulingFuture;
				newSchedulingFuture.whenComplete(
					(Void ignored, Throwable throwable) -> {
						if (throwable != null) {
							final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

							if (!(strippedThrowable instanceof CancellationException)) {
								// only fail if the scheduling future was not canceled
								failGlobal(strippedThrowable);
							}
						}
					});
			} else {
				newSchedulingFuture.cancel(false);
			}
		}
		else {
			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
		}
	}
 
源代码13 项目: distributedlog   文件: FutureUtils.java
public static <T> CompletableFuture<T> rescue(CompletableFuture<T> future,
                                              Function<Throwable, CompletableFuture<T>> rescueFuc) {
    CompletableFuture<T> result = FutureUtils.createFuture();
    future.whenComplete((value, cause) -> {
        if (null == cause) {
            result.complete(value);
            return;
        }
        proxyTo(rescueFuc.apply(cause), result);
    });
    return result;
}
 
源代码14 项目: distributedlog   文件: TestAsyncReaderWriter.java
/**
 * Test Case: starting reading when the streams don't exist.
 * {@link https://issues.apache.org/jira/browse/DL-42}
 */
@DistributedLogAnnotations.FlakyTest
@Ignore
@Test(timeout = 120000)
public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception {
    // int count = 50;
    int count = 1;
    String name = runtime.getMethodName();
    DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
    confLocal.loadConf(testConf);
    confLocal.setReadAheadWaitTime(10);
    confLocal.setReadAheadBatchSize(10);
    confLocal.setOutputBufferSize(1024);

    int numLogSegments = 3;
    int numRecordsPerLogSegment = 1;

    URI uri = createDLMURI("/" + name);
    ensureURICreated(uri);
    Namespace namespace = NamespaceBuilder.newBuilder()
            .conf(confLocal).uri(uri).build();
    final DistributedLogManager[] dlms = new DistributedLogManager[count];
    final TestReader[] readers = new TestReader[count];
    final CountDownLatch readyLatch = new CountDownLatch(count);
    final CountDownLatch[] syncLatches = new CountDownLatch[count];
    final CountDownLatch[] readerDoneLatches = new CountDownLatch[count];
    for (int s = 0; s < count; s++) {
        dlms[s] = namespace.openLog(name + String.format("%d", s));
        readerDoneLatches[s] = new CountDownLatch(1);
        syncLatches[s] = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
        readers[s] = new TestReader("reader-" + s,
                dlms[s], DLSN.InitialDLSN, false, 0, readyLatch, syncLatches[s], readerDoneLatches[s]);
        readers[s].start();
    }

    // wait all readers were positioned at least once
    readyLatch.await();

    final CountDownLatch writeLatch = new CountDownLatch(3 * count);
    final AtomicBoolean writeErrors = new AtomicBoolean(false);

    int txid = 1;
    for (long i = 0; i < 3; i++) {
        final long currentLogSegmentSeqNo = i + 1;
        BKAsyncLogWriter[] writers = new BKAsyncLogWriter[count];
        for (int s = 0; s < count; s++) {
            writers[s] = (BKAsyncLogWriter) (dlms[s].startAsyncLogSegmentNonPartitioned());
        }
        for (long j = 0; j < 1; j++) {
            final long currentEntryId = j;
            final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
            for (int s = 0; s < count; s++) {
                CompletableFuture<DLSN> dlsnFuture = writers[s].write(record);
                dlsnFuture.whenComplete(new WriteFutureEventListener(
                        record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
            }
        }
        for (int s = 0; s < count; s++) {
            writers[s].closeAndComplete();
        }
    }

    writeLatch.await();
    assertFalse("All writes should succeed", writeErrors.get());

    for (int s = 0; s < count; s++) {
        readerDoneLatches[s].await();
        assertFalse("Reader " + s + " should not encounter errors", readers[s].areErrorsFound());
        syncLatches[s].await();
        assertEquals(numLogSegments * numRecordsPerLogSegment, readers[s].getNumReads().get());
        assertTrue("Reader " + s + " should position at least once", readers[s].getNumReaderPositions().get() > 0);
    }

    for (int s = 0; s < count; s++) {
        readers[s].stop();
        dlms[s].close();
    }
}
 
源代码15 项目: pravega   文件: ScaleTest.java
public static void main(String[] args) throws Exception {
    try {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        @Cleanup
        TestingServer zkTestServer = new TestingServerStarter().start();

        ServiceBuilder serviceBuilder = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig());
        serviceBuilder.initialize();
        StreamSegmentStore store = serviceBuilder.createStreamSegmentService();
        TableStore tableStore = serviceBuilder.createTableStoreService();
        int port = Config.SERVICE_PORT;
        @Cleanup
        PravegaConnectionListener server = new PravegaConnectionListener(false, port, store, tableStore,
                serviceBuilder.getLowPriorityExecutor());
        server.startListening();

        // Create controller object for testing against a separate controller report.
        @Cleanup
        ControllerWrapper controllerWrapper = new ControllerWrapper(zkTestServer.getConnectString(), port);
        Controller controller = controllerWrapper.getController();

        final String scope = "scope";
        controllerWrapper.getControllerService().createScope(scope).get();

        final String streamName = "stream1";
        final StreamConfiguration config =
                StreamConfiguration.builder().scalingPolicy(
                        ScalingPolicy.fixed(1)).build();

        Stream stream = new StreamImpl(scope, streamName);

        log.info("Creating stream {}/{}", scope, streamName);
        if (!controller.createStream(scope, streamName, config).get()) {
            log.error("Stream already existed, exiting");
            return;
        }

        // Test 1: scale stream: split one segment into two
        log.info("Scaling stream {}/{}, splitting one segment into two", scope, streamName);
        Map<Double, Double> map = new HashMap<>();
        map.put(0.0, 0.5);
        map.put(0.5, 1.0);

        if (!controller.scaleStream(stream, Collections.singletonList(0L), map, executor).getFuture().get()) {
            log.error("Scale stream: splitting segment into two failed, exiting");
            return;
        }

        // Test 2: scale stream: merge two segments into one
        log.info("Scaling stream {}/{}, merging two segments into one", scope, streamName);
        CompletableFuture<Boolean> scaleResponseFuture = controller.scaleStream(stream, Arrays.asList(1L, 2L),
                Collections.singletonMap(0.0, 1.0), executor).getFuture();

        if (!scaleResponseFuture.get()) {
            log.error("Scale stream: merging two segments into one failed, exiting");
            return;
        }

        // Test 3: create a transaction, and try scale operation, it should fail with precondition check failure
        CompletableFuture<TxnSegments> txnFuture = controller.createTransaction(stream, 5000);
        TxnSegments transaction = txnFuture.get();
        if (transaction == null) {
            log.error("Create transaction failed, exiting");
            return;
        }

        log.info("Scaling stream {}/{}, splitting one segment into two, while transaction is ongoing",
                scope, streamName);
        scaleResponseFuture = controller.scaleStream(stream, Collections.singletonList(3L), map, executor).getFuture();
        CompletableFuture<Boolean> future = scaleResponseFuture.whenComplete((r, e) -> {
            if (e != null) {
                log.error("Failed: scale with ongoing transaction.", e);
            } else if (getAndHandleExceptions(controller.checkTransactionStatus(stream, transaction.getTxnId()),
                    RuntimeException::new) != Transaction.Status.OPEN) {
                log.info("Success: scale with ongoing transaction.");
            } else {
                log.error("Failed: scale with ongoing transaction.");
            }
        });

        CompletableFuture<Void> statusFuture = controller.abortTransaction(stream, transaction.getTxnId());
        statusFuture.get();
        future.get();

        log.info("All scaling test PASSED");
        ExecutorServiceHelpers.shutdown(executor);
        System.exit(0);
    } catch (Throwable t) {
        log.error("test failed with {}", t);
        System.exit(-1);
    }
}
 
@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
    CompletableFuture<Void> result = new CompletableFuture<>();

    createSystemTopicFactoryIfNeeded();
    SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory.createSystemTopic(topicName.getNamespaceObject(),
            EventType.TOPIC_POLICY);

    CompletableFuture<SystemTopicClient.Writer> writerFuture = systemTopicClient.newWriterAsync();
    writerFuture.whenComplete((writer, ex) -> {
        if (ex != null) {
            result.completeExceptionally(ex);
        } else {
            writer.writeAsync(
                PulsarEvent.builder()
                    .actionType(ActionType.UPDATE)
                    .eventType(EventType.TOPIC_POLICY)
                    .topicPoliciesEvent(
                        TopicPoliciesEvent.builder()
                            .domain(topicName.getDomain().toString())
                            .tenant(topicName.getTenant())
                            .namespace(topicName.getNamespaceObject().getLocalName())
                            .topic(topicName.getLocalName())
                            .policies(policies)
                            .build())
                    .build()).whenComplete(((messageId, e) -> {
                        if (e != null) {
                            result.completeExceptionally(e);
                        } else {
                            if (messageId != null) {
                                result.complete(null);
                            } else {
                                result.completeExceptionally(new RuntimeException("Got message id is null."));
                            }
                        }
                        writer.closeAsync().whenComplete((v, cause) -> {
                            if (cause != null) {
                                log.error("[{}] Close writer error.", topicName, cause);
                            } else {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Close writer success.", topicName);
                                }
                            }
                        });
                })
            );
        }
    });
    return result;
}
 
源代码17 项目: servicecomb-samples   文件: AuthHandler.java
@Override
public void handle(Invocation invocation, AsyncResponse asyncResponse) throws Exception {
  if (invocation.getMicroserviceName().equals("user-service")
      && (invocation.getOperationName().equals("login")
          || (invocation.getOperationName().equals("getSession")))) {
    // login:return session id, set cookie by javascript
    invocation.next(asyncResponse);
  } else {
    // check session
    String sessionId = invocation.getContext("session-id");
    if (sessionId == null) {
      throw new InvocationException(403, "", "session is not valid.");
    }

    String sessionInfo = sessionCache.getIfPresent(sessionId);
    if (sessionInfo != null) {
      try {
        // session info stored in InvocationContext. Microservices can get it. 
        invocation.addContext("session-id", sessionId);
        invocation.addContext("session-info", sessionInfo);
        invocation.next(asyncResponse);
      } catch (Exception e) {
        asyncResponse.complete(Response.failResp(new InvocationException(500, "", e.getMessage())));
      }
      return;
    }

    // In edge, handler is executed in reactively. Must have no blocking logic.
    CompletableFuture<SessionInfo> result = userServiceClient.getGetSessionOperation().getSession(sessionId);
    result.whenComplete((info, e) -> {
      if (result.isCompletedExceptionally()) {
        asyncResponse.complete(Response.failResp(new InvocationException(403, "", "session is not valid.")));
      } else {
        if (info == null) {
          asyncResponse.complete(Response.failResp(new InvocationException(403, "", "session is not valid.")));
          return;
        }
        try {
          // session info stored in InvocationContext. Microservices can get it. 
          invocation.addContext("session-id", sessionId);
          String sessionInfoStr = JsonUtils.writeValueAsString(info);
          invocation.addContext("session-info", sessionInfoStr);
          invocation.next(asyncResponse);
          sessionCache.put(sessionId, sessionInfoStr);
        } catch (Exception ee) {
          asyncResponse.complete(Response.failResp(new InvocationException(500, "", ee.getMessage())));
        }
      }
    });
  }
}
 
源代码18 项目: fastjgame   文件: FutureUtils.java
public static <V> Promise<V> fromCompletableFuture(CompletableFuture<V> completableFuture) {
    final Promise<V> promise = new JdkPromise<>(completableFuture);
    completableFuture.whenComplete(new UniRelay<>(promise));
    return promise;
}
 
源代码19 项目: distributedlog   文件: TestAsyncReaderWriter.java
private static void readNext(final AsyncLogReader reader,
                             final DLSN startPosition,
                             final long startSequenceId,
                             final boolean monotonic,
                             final CountDownLatch syncLatch,
                             final CountDownLatch completionLatch,
                             final AtomicBoolean errorsFound) {
    CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
    record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
        @Override
        public void onSuccess(LogRecordWithDLSN value) {
            try {
                if (monotonic) {
                    assertEquals(startSequenceId, value.getSequenceId());
                } else {
                    assertTrue(value.getSequenceId() < 0);
                    assertTrue(value.getSequenceId() > startSequenceId);
                }
                LOG.info("Received record {} from {}", value, reader.getStreamName());
                assertTrue(!value.isControl());
                assertTrue(value.getDlsn().getSlotId() == 0);
                assertTrue(value.getDlsn().compareTo(startPosition) >= 0);
                DLMTestUtil.verifyLargeLogRecord(value);
            } catch (Exception exc) {
                LOG.debug("Exception Encountered when verifying log record {} : ", value.getDlsn(), exc);
                errorsFound.set(true);
                completionLatch.countDown();
                return;
            }
            syncLatch.countDown();
            if (syncLatch.getCount() <= 0) {
                completionLatch.countDown();
            } else {
                TestAsyncReaderWriter.readNext(
                        reader,
                        value.getDlsn().getNextDLSN(),
                        monotonic ? value.getSequenceId() + 1 : value.getSequenceId(),
                        monotonic,
                        syncLatch,
                        completionLatch,
                        errorsFound);
            }
        }
        @Override
        public void onFailure(Throwable cause) {
            LOG.error("Encountered Exception on reading {}", reader.getStreamName(), cause);
            errorsFound.set(true);
            completionLatch.countDown();
        }
    });
}
 
源代码20 项目: flink   文件: JobMaster.java
/**
 * Suspending job, all the running tasks will be cancelled, and communication with other components
 * will be disposed.
 *
 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
 * calling the {@link #start(JobMasterId)} method once we take the leadership back again.
 *
 * <p>This method is executed asynchronously
 *
 * @param cause The reason of why this job been suspended.
 * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
 */
public CompletableFuture<Acknowledge> suspend(final Exception cause) {
	CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(
			() -> suspendExecution(cause),
			RpcUtils.INF_TIMEOUT);

	return suspendFuture.whenComplete((acknowledge, throwable) -> stop());
}