下面列出了java.util.concurrent.CompletableFuture#runAfterBothAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Imports blocks that are stored as concatenated RLP sections in the given file into Besu's block
* storage.
*
* @param blocks Path to the file containing the blocks
* @param besuController the BesuController that defines blockchain behavior
* @param skipPowValidation Skip proof of work validation (correct mix hash and difficulty)
* @return the import result
* @throws IOException On Failure
*/
public RlpBlockImporter.ImportResult importBlockchain(
final Path blocks, final BesuController besuController, final boolean skipPowValidation)
throws IOException {
final ProtocolSchedule protocolSchedule = besuController.getProtocolSchedule();
final ProtocolContext context = besuController.getProtocolContext();
final MutableBlockchain blockchain = context.getBlockchain();
int count = 0;
try (final RawBlockIterator iterator =
new RawBlockIterator(
blocks,
rlp ->
BlockHeader.readFrom(
rlp, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)))) {
BlockHeader previousHeader = null;
CompletableFuture<Void> previousBlockFuture = null;
while (iterator.hasNext()) {
final Block block = iterator.next();
final BlockHeader header = block.getHeader();
if (header.getNumber() == BlockHeader.GENESIS_BLOCK_NUMBER) {
continue;
}
if (header.getNumber() % 100 == 0) {
LOG.info("Import at block {}", header.getNumber());
}
if (blockchain.contains(header.getHash())) {
continue;
}
if (previousHeader == null) {
previousHeader = lookupPreviousHeader(blockchain, header);
}
final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(header.getNumber());
final BlockHeader lastHeader = previousHeader;
final CompletableFuture<Void> validationFuture =
CompletableFuture.runAsync(
() -> validateBlock(protocolSpec, context, lastHeader, header, skipPowValidation),
validationExecutor);
final CompletableFuture<Void> extractingFuture =
CompletableFuture.runAsync(() -> extractSignatures(block));
final CompletableFuture<Void> calculationFutures;
if (previousBlockFuture == null) {
calculationFutures = extractingFuture;
} else {
calculationFutures = CompletableFuture.allOf(extractingFuture, previousBlockFuture);
}
try {
blockBacklog.acquire();
} catch (final InterruptedException e) {
LOG.error("Interrupted adding to backlog.", e);
break;
}
previousBlockFuture =
validationFuture.runAfterBothAsync(
calculationFutures,
() -> evaluateBlock(context, block, header, protocolSpec, skipPowValidation),
importExecutor);
++count;
previousHeader = header;
}
if (previousBlockFuture != null) {
previousBlockFuture.join();
}
return new RlpBlockImporter.ImportResult(
blockchain.getChainHead().getTotalDifficulty(), count);
}
}
/**
* When an already-contextualized Runnable is specified as the action/task,
* the action/task runs with its already-captured context rather than
* capturing and applying context per the configuration of the managed executor.
*
* @throws ExecutionException indicates test failure
* @throws InterruptedException indicates test failure
* @throws TimeoutException indicates test failure
*/
@Test
public void contextOfContextualRunnableOverridesContextOfManagedExecutor() throws ExecutionException, InterruptedException, TimeoutException {
ThreadContext labelContext = ThreadContext.builder()
.propagated(Label.CONTEXT_NAME)
.unchanged()
.cleared(ThreadContext.ALL_REMAINING)
.build();
ManagedExecutor executor = ManagedExecutor.builder()
.propagated(Buffer.CONTEXT_NAME)
.cleared(ThreadContext.ALL_REMAINING)
.build();
try {
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-1"));
Label.set("contextualRunnableOverride-label-1");
Runnable precontextualizedTask1 = labelContext.contextualRunnable(() -> {
Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-1",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-2"));
Label.set("contextualRunnableOverride-label-2");
Runnable precontextualizedTask2 = labelContext.contextualRunnable(() -> {
Assert.assertEquals(Label.get(), "contextualRunnableOverride-label-2",
"Previously captured context type not found on thread.");
Assert.assertEquals(Buffer.get().toString(), "",
"Context type not cleared from thread.");
});
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-3"));
Label.set("contextualRunnableOverride-label-3");
Runnable normalTask = () -> {
Assert.assertEquals(Buffer.get().toString(), "contextualRunnableOverride-buffer-3",
"Previously captured context type not found on thread.");
Assert.assertEquals(Label.get(), "",
"Context type not cleared from thread.");
};
Future<Integer> future = executor.submit(precontextualizedTask1, 1);
Assert.assertEquals(future.get(MAX_WAIT_NS, TimeUnit.NANOSECONDS), Integer.valueOf(1),
"Unexpected result of task.");
CompletableFuture<Void> stage0 = executor.runAsync(precontextualizedTask1);
CompletableFuture<Void> stage1 = stage0.thenRunAsync(precontextualizedTask1);
CompletableFuture<Void> stage2 = stage0.thenRun(precontextualizedTask2);
CompletableFuture<Void> stage3 = stage1.runAfterEither(stage2, precontextualizedTask2);
CompletableFuture<Void> stage4 = stage1.runAfterBothAsync(stage2, precontextualizedTask1);
CompletableFuture<Void> stage5 = stage4.runAfterBoth(stage3, normalTask);
stage5.join();
LinkedBlockingQueue<String> results = new LinkedBlockingQueue<String>();
Runnable precontextualizedTask3 = labelContext.contextualRunnable(() -> results.add(Label.get()));
Buffer.set(new StringBuffer("contextualRunnableOverride-buffer-4"));
Label.set("contextualRunnableOverride-label-4");
executor.execute(precontextualizedTask3);
Assert.assertEquals(results.poll(MAX_WAIT_NS, TimeUnit.NANOSECONDS), "contextualRunnableOverride-label-3",
"Previously captured context type not found on thread.");
}
finally {
executor.shutdownNow();
// Restore original values
Buffer.set(null);
Label.set(null);
}
}
public <T,U> CompletableFuture<Void> runAfterBoth
(CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
return f.runAfterBothAsync(g, a);
}
public <T,U> CompletableFuture<Void> runAfterBoth
(CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
return f.runAfterBothAsync(g, a, new ThreadExecutor());
}
public <T,U> CompletableFuture<Void> runAfterBoth
(CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
return f.runAfterBothAsync(g, a);
}
public <T,U> CompletableFuture<Void> runAfterBoth
(CompletableFuture<T> f, CompletableFuture<U> g, Runnable a) {
return f.runAfterBothAsync(g, a, new ThreadExecutor());
}