java.util.concurrent.CompletableFuture#runAfterBothAsync ( )源码实例Demo

下面列出了java.util.concurrent.CompletableFuture#runAfterBothAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: besu   文件: RlpBlockImporter.java
/**
 * Imports blocks that are stored as concatenated RLP sections in the given file into Besu's block
 * storage.
 *
 * @param blocks Path to the file containing the blocks
 * @param besuController the BesuController that defines blockchain behavior
 * @param skipPowValidation Skip proof of work validation (correct mix hash and difficulty)
 * @return the import result
 * @throws IOException On Failure
 */
public RlpBlockImporter.ImportResult importBlockchain(
    final Path blocks, final BesuController besuController, final boolean skipPowValidation)
    throws IOException {
  final ProtocolSchedule protocolSchedule = besuController.getProtocolSchedule();
  final ProtocolContext context = besuController.getProtocolContext();
  final MutableBlockchain blockchain = context.getBlockchain();
  int count = 0;

  try (final RawBlockIterator iterator =
      new RawBlockIterator(
          blocks,
          rlp ->
              BlockHeader.readFrom(
                  rlp, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)))) {
    BlockHeader previousHeader = null;
    CompletableFuture<Void> previousBlockFuture = null;
    while (iterator.hasNext()) {
      final Block block = iterator.next();
      final BlockHeader header = block.getHeader();
      if (header.getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER) {
        continue;
      }
      if (header.getNumber() % 100 == 0) {
        LOG.info("Import at block {}", header.getNumber());
      }
      if (blockchain.contains(header.getHash())) {
        continue;
      }
      if (previousHeader == null) {
        previousHeader = lookupPreviousHeader(blockchain, header);
      }
      final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(header.getNumber());
      final BlockHeader lastHeader = previousHeader;

      final CompletableFuture<Void> validationFuture =
          CompletableFuture.runAsync(
              () -> validateBlock(protocolSpec, context, lastHeader, header, skipPowValidation),
              validationExecutor);

      final CompletableFuture<Void> extractingFuture =
          CompletableFuture.runAsync(() -> extractSignatures(block));

      final CompletableFuture<Void> calculationFutures;
      if (previousBlockFuture == null) {
        calculationFutures = extractingFuture;
      } else {
        calculationFutures = CompletableFuture.allOf(extractingFuture, previousBlockFuture);
      }

      try {
        blockBacklog.acquire();
      } catch (final InterruptedException e) {
        LOG.error("Interrupted adding to backlog.", e);
        break;
      }
      previousBlockFuture =
          validationFuture.runAfterBothAsync(
              calculationFutures,
              () -> evaluateBlock(context, block, header, protocolSpec, skipPowValidation),
              importExecutor);

      ++count;
      previousHeader = header;
    }
    if (previousBlockFuture != null) {
      previousBlockFuture.join();
    }
    return new RlpBlockImporter.ImportResult(
        blockchain.getChainHead().getTotalDifficulty(), count);
  }
}
 
/**
 * When an already-contextualized Runnable is specified as the action/task,
 * the action/task runs with its already-captured context rather than
 * capturing and applying context per the configuration of the managed executor.
 *
 * @throws ExecutionException indicates test failure
 * @throws InterruptedException indicates test failure
 * @throws TimeoutException indicates test failure
 */
@Test
public void contextOfContextualRunnableOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
    ThreadContext labelContext = ThreadContext.builder()
            .propagated(Label.CONTEXT_NAME)
            .unchanged()
            .cleared(ThreadContext.ALL_REMAINING)
            .build();

    ManagedExecutor executor = ManagedExecutor.builder()
            .propagated(Buffer.CONTEXT_NAME)
            .cleared(ThreadContext.ALL_REMAINING)
            .build();
    try {
        Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-1"));
        Label.set("contextualRunnableOverride-label-1");

        Runnable precontextualizedTask1 = labelContext.contextualRunnable(() -> {
            Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-1",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-2"));
        Label.set("contextualRunnableOverride-label-2");

        Runnable precontextualizedTask2 = labelContext.contextualRunnable(() -> {
            Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-2",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Buffer.get().toString(), "",
                    "Context type not cleared from thread.");
        });

        Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-3"));
        Label.set("contextualRunnableOverride-label-3");

        Runnable normalTask = () -> {
            Assert.assertEquals(Buffer.get().toString(), "contextualRunnableOverride-buffer-3",
                    "Previously captured context type not found on thread.");
            Assert.assertEquals(Label.get(), "",
                    "Context type not cleared from thread.");
        };

        Future<Integer> future = executor.submit(precontextualizedTask1, 1);
        Assert.assertEquals(future.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS), Integer.valueOf(1),
                "Unexpected result of task.");

        CompletableFuture<Void> stage0 = executor.runAsync(precontextualizedTask1);
        CompletableFuture<Void> stage1 = stage0.thenRunAsync(precontextualizedTask1);
        CompletableFuture<Void> stage2 = stage0.thenRun(precontextualizedTask2);
        CompletableFuture<Void> stage3 = stage1.runAfterEither(stage2, precontextualizedTask2);
        CompletableFuture<Void> stage4 = stage1.runAfterBothAsync(stage2, precontextualizedTask1);
        CompletableFuture<Void> stage5 = stage4.runAfterBoth(stage3, normalTask);
        stage5.join();

        LinkedBlockingQueue<String> results = new LinkedBlockingQueue<String>();
        Runnable precontextualizedTask3 = labelContext.contextualRunnable(() -> results.add(Label.get()));

        Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-4"));
        Label.set("contextualRunnableOverride-label-4");

        executor.execute(precontextualizedTask3);
        Assert.assertEquals(results.poll(MAX_WAIT_NS, TimeUnit.NANOSECONDS), "contextualRunnableOverride-label-3",
                "Previously captured context type not found on thread.");
    }
    finally {
        executor.shutdownNow();
        // Restore original values
        Buffer.set(null);
        Label.set(null);
    }
}
 
源代码3 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<Void> runAfterBoth
    (CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
    return f.runAfterBothAsync(g, a);
}
 
源代码4 项目: openjdk-jdk9   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<Void> runAfterBoth
    (CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
    return f.runAfterBothAsync(g, a, new ThreadExecutor());
}
 
源代码5 项目: j2objc   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<Void> runAfterBoth
    (CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
    return f.runAfterBothAsync(g, a);
}
 
源代码6 项目: j2objc   文件: CompletableFutureTest.java
public <T,U> CompletableFuture<Void> runAfterBoth
    (CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
    return f.runAfterBothAsync(g, a, new ThreadExecutor());
}