下面列出了com.google.common.util.concurrent.ListeningExecutorService#shutdown ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void writesUnblockReads() throws ExecutionException, InterruptedException {
ListeningExecutorService service = listeningDecorator(newSingleThreadExecutor());
AtomicInteger counter = new AtomicInteger();
RingBufferInputStream buffer = new RingBufferInputStream(1);
ListenableFuture<Integer> readFuture =
service.submit(
() -> {
counter.getAndIncrement();
return buffer.read();
});
byte[] content = new byte[1];
content[0] = 42;
while (counter.get() != 1) {
MICROSECONDS.sleep(10);
}
assertThat(readFuture.isDone()).isFalse();
buffer.write(content);
assertThat(readFuture.get()).isEqualTo(content[0]);
service.shutdown();
service.awaitTermination(10, MICROSECONDS);
}
/**
* Counts yield and q30 of fastqs in the fastqsPerSample multimap, using 1 thread per file.
* The yield and q30 of the Undetermined sample will count towards the total yield and q30 of the flowcell.
*
* @param fastqsPerSample multimap of sampleName and fastqs to process
* @param threadCount number of maximum threads
* @return FastqTracker with yield and q30 stats for the fastqs processed.
*/
@NotNull
static FastqTracker processFastqs(@NotNull final Multimap<String, File> fastqsPerSample, final int threadCount)
throws InterruptedException {
LOGGER.info("Using {} threads. Processing {} fastQ files.", threadCount, fastqsPerSample.size());
final FastqTrackerWrapper tracker = new FastqTrackerWrapper();
final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
for (final String sampleName : fastqsPerSample.keySet()) {
final Collection<File> fastqs = fastqsPerSample.get(sampleName);
for (final File fastq : fastqs) {
final String laneName = getLaneName(fastq);
final ListenableFuture<FastqData> futureResult = threadPool.submit(() -> processFile(fastq));
addCallback(futureResult, (data) -> tracker.addDataFromSampleFile(sampleName, laneName, data),
(error) -> LOGGER.error("Failed to process file: {}", fastq.getName(), error));
}
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
return tracker.tracker();
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() throws InterruptedException {
MILLISECONDS.sleep(100);
return null;
}
});
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(200);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() {
return null;
}
});
MILLISECONDS.sleep(100);
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(100);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() throws InterruptedException {
MILLISECONDS.sleep(100);
return null;
}
});
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(200);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Void> future1 = executor.submit(new Callable<Void>() {
@Override
public Void call() {
return null;
}
});
MILLISECONDS.sleep(100);
future1.addListener(new Runnable() {
@Override
public void run() {
new CreateTraceEntry().traceEntryMarker();
}
}, executor);
MILLISECONDS.sleep(100);
executor.shutdown();
executor.awaitTermination(10, SECONDS);
}
private static void run() throws IOException, InterruptedException, ExecutionException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
// create dummy log/index files, and load the reader from them
final File logFile = new File("reloadabletest.spl");
create(Sparkey.getIndexFile(logFile));
final ReloadableSparkeyReader reader = ReloadableSparkeyReader.fromLogFile(logFile, executorService).toCompletableFuture().get();
// should be ignored (same file)
reader.load(logFile);
// should load from second file now
final File logFile2 = new File("reloadabletest2.spl");
create(Sparkey.getIndexFile(logFile2));
reader.load(logFile2);
reader.close();
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
Sparkey.getIndexFile(logFile).delete();
logFile.delete();
Sparkey.getIndexFile(logFile2).delete();
logFile2.delete();
System.out.println("Done!");
}
@Test(timeout = 5000)
public void testConcurrentRequests() throws InterruptedException {
int timeoutSecond = 5;
int concurThread = 10;
int exceptionCount = 0;
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
List<ListenableFuture<Object>> pendingTasks = new ArrayList<ListenableFuture<Object>>();
final ExecutorService callbackExecutor = Executors.newFixedThreadPool(concurThread,
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("CallbackExecutor").build());
ListeningExecutorService taskExecutorService =
MoreExecutors.listeningDecorator(callbackExecutor);
while(concurThread > 0){
ListenableFuture<Object> runningTaskFuture =
taskExecutorService.submit(new EnvironmentRequest());
pendingTasks.add(runningTaskFuture);
concurThread--;
}
//waiting for all threads submitted to thread pool
for (ListenableFuture<Object> future : pendingTasks) {
try {
future.get();
} catch (ExecutionException e) {
exceptionCount++;
}
}
//stop accepting new threads and shutdown threadpool
taskExecutorService.shutdown();
try {
if(!taskExecutorService.awaitTermination(timeoutSecond, TimeUnit.SECONDS)) {
taskExecutorService.shutdownNow();
}
} catch (InterruptedException ie) {
taskExecutorService.shutdownNow();
}
assertEquals(0, exceptionCount);
}
private ImmutableSetMultimap<Path, Prebuilt> downloadArtifacts(
MutableDirectedGraph<Artifact> graph, ImmutableMap<String, Dependency> specifiedDependencies)
throws ExecutionException, InterruptedException {
ListeningExecutorService exec =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new MostExecutors.NamedThreadFactory("artifact download")));
@SuppressWarnings("unchecked")
List<ListenableFuture<Map.Entry<Path, Prebuilt>>> results =
(List<ListenableFuture<Map.Entry<Path, Prebuilt>>>)
(List<?>)
exec.invokeAll(
graph.getNodes().stream()
.map(
artifact ->
(Callable<Map.Entry<Path, Prebuilt>>)
() -> downloadArtifact(artifact, graph, specifiedDependencies))
.collect(ImmutableList.toImmutableList()));
try {
return ImmutableSetMultimap.<Path, Prebuilt>builder()
.orderValuesBy(Ordering.natural())
.putAll(Futures.allAsList(results).get())
.build();
} finally {
exec.shutdown();
}
}
@Test
public void fuzzForConcurrentAccess() throws Exception {
int parsersCount = 3;
Cells cell = new TestCellBuilder().build();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
try (ProjectBuildFileParserPool parserPool =
createParserPool(
parsersCount,
(eventBus, input, watchman, threadSafe) -> {
AtomicInteger sleepCallCount = new AtomicInteger(0);
return createMockParser(
() -> {
int numCalls = sleepCallCount.incrementAndGet();
Preconditions.checkState(numCalls == 1);
try {
Thread.sleep(10);
} finally {
sleepCallCount.decrementAndGet();
}
return EMPTY_BUILD_FILE_MANIFEST;
});
})) {
Futures.allAsList(scheduleWork(cell.getRootCell(), parserPool, executorService, 142)).get();
} finally {
executorService.shutdown();
}
}
@Test
public void ignoresCancellation() throws Exception {
Cells cell = new TestCellBuilder().build();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
int numberOfJobs = 5;
CountDownLatch waitTillAllWorkIsDone = new CountDownLatch(numberOfJobs);
CountDownLatch waitTillCanceled = new CountDownLatch(1);
try (ProjectBuildFileParserPool parserPool =
createParserPool(
/* maxParsers */ 1,
createMockParserFactory(
() -> {
waitTillCanceled.await();
waitTillAllWorkIsDone.countDown();
return EMPTY_BUILD_FILE_MANIFEST;
}))) {
ImmutableSet<ListenableFuture<?>> futures =
scheduleWork(cell.getRootCell(), parserPool, executorService, numberOfJobs);
for (ListenableFuture<?> future : futures) {
future.cancel(true);
}
waitTillCanceled.countDown();
// We're making sure cancel is ignored by the pool by waiting for the supposedly canceled
// work to go through.
waitTillAllWorkIsDone.await(1, TimeUnit.SECONDS);
} finally {
executorService.shutdown();
}
}
@Test
public void workThatThrows() throws Exception {
Cells cell = new TestCellBuilder().build();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
String exceptionMessage = "haha!";
AtomicBoolean throwWhileParsing = new AtomicBoolean(true);
try (ProjectBuildFileParserPool parserPool =
createParserPool(
/* maxParsers */ 2,
createMockParserFactory(
() -> {
if (throwWhileParsing.get()) {
throw new Exception(exceptionMessage);
}
return EMPTY_BUILD_FILE_MANIFEST;
}))) {
ImmutableSet<ListenableFuture<?>> failedWork =
scheduleWork(cell.getRootCell(), parserPool, executorService, 5);
for (ListenableFuture<?> failedFuture : failedWork) {
try {
failedFuture.get();
fail("Expected ExecutionException to be thrown.");
} catch (ExecutionException e) {
assertThat(e.getCause().getMessage(), Matchers.equalTo(exceptionMessage));
}
}
// Make sure it's still possible to do work.
throwWhileParsing.set(false);
Futures.allAsList(scheduleWork(cell.getRootCell(), parserPool, executorService, 5)).get();
} finally {
executorService.shutdown();
}
}
@Test
public void fourKindsOfRequestAtOnce() throws Exception {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
// == MAKE REQUESTS ==
// One to One
Single<HelloRequest> req1 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp1 = req1.compose(stub::sayHello);
// One to Many
Single<HelloRequest> req2 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Flowable<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);
// Many to One
Flowable<HelloRequest> req3 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Single<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);
// Many to Many
Flowable<HelloRequest> req4 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build(),
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());
Flowable<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);
// == VERIFY RESPONSES ==
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// Run all four verifications in parallel
try {
// One to One
ListenableFuture<Boolean> oneToOne = executorService.submit(() -> {
TestObserver<String> testObserver1 = resp1.map(HelloResponse::getMessage).test();
testObserver1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver1.assertValue("Hello rxjava");
return true;
});
// One to Many
ListenableFuture<Boolean> oneToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber1 = resp2.map(HelloResponse::getMessage).test();
testSubscriber1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber1.assertValues("Hello rxjava", "Hi rxjava", "Greetings rxjava");
return true;
});
// Many to One
ListenableFuture<Boolean> manyToOne = executorService.submit(() -> {
TestObserver<String> testObserver2 = resp3.map(HelloResponse::getMessage).test();
testObserver2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver2.assertValue("Hello a and b and c");
return true;
});
// Many to Many
ListenableFuture<Boolean> manyToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber2 = resp4.map(HelloResponse::getMessage).test();
testSubscriber2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber2.assertValues("Hello a and b", "Hello c and d", "Hello e");
testSubscriber2.assertComplete();
return true;
});
@SuppressWarnings("unchecked")
ListenableFuture<List<Boolean>> allFutures = Futures.allAsList(Lists.newArrayList(oneToOne, oneToMany, manyToOne, manyToMany));
// Block for response
List<Boolean> results = allFutures.get(3, TimeUnit.SECONDS);
assertThat(results).containsExactly(true, true, true, true);
} finally {
executorService.shutdown();
}
}
public static TasmoServiceHandle<ReadMaterializerViewFields> initialize(TasmoReadMaterializerConfig config,
TasmoViewModel tasmoViewModel,
WrittenEventProvider writtenEventProvider,
TasmoStorageProvider tasmoStorageProvider) throws Exception {
ConcurrencyStore concurrencyStore = new HBaseBackedConcurrencyStore(tasmoStorageProvider.concurrencyStorage());
ReferenceStore referenceStore = new ReferenceStore(concurrencyStore,
tasmoStorageProvider.multiLinksStorage(),
tasmoStorageProvider.multiBackLinksStorage());
// TODO add config option to switch between batching and serial.
ReferenceTraverser referenceTraverser = new SerialReferenceTraverser(referenceStore);
EventValueStore eventValueStore = new EventValueStore(concurrencyStore, tasmoStorageProvider.eventStorage());
FieldValueReader fieldValueReader = new EventValueStoreFieldValueReader(eventValueStore);
ThreadFactory eventProcessorThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("view-read-materialization-processor-%d")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t.getName() + " threw uncaught exception", e);
}
})
.build();
ExecutorService processorThreads = Executors.newFixedThreadPool(config.getNumberOfViewRequestProcessorThreads(), eventProcessorThreadFactory);
final ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(processorThreads);
final ReadMaterializerViewFields readMaterializer = new ReadMaterializerViewFields(referenceTraverser,
fieldValueReader, concurrencyStore, tasmoViewModel, listeningDecorator);
return new TasmoServiceHandle<ReadMaterializerViewFields>() {
@Override
public ReadMaterializerViewFields getService() {
return readMaterializer;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
listeningDecorator.shutdown();
}
};
}
private ListenableFuture<X509Certificate> downloadCertificate(final String url)
{
final ListeningExecutorService workerService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
getThreadFactory("download-certificate-worker-" + getName())));
try
{
return workerService.submit(new Callable<X509Certificate>()
{
@Override
public X509Certificate call()
{
try
{
final URL siteUrl = new URL(url);
final int port = siteUrl.getPort() == -1 ? siteUrl.getDefaultPort() : siteUrl.getPort();
SSLContext sslContext = SSLUtil.tryGetSSLContext();
sslContext.init(new KeyManager[0], new TrustManager[]{new AlwaysTrustManager()}, null);
try (SSLSocket socket = (SSLSocket) sslContext.getSocketFactory().createSocket())
{
socket.setSoTimeout(_readTimeout);
socket.connect(new InetSocketAddress(siteUrl.getHost(), port), _connectTimeout);
socket.startHandshake();
final Certificate[] certificateChain = socket.getSession().getPeerCertificates();
if (certificateChain != null
&& certificateChain.length != 0
&& certificateChain[0] instanceof X509Certificate)
{
final X509Certificate x509Certificate = (X509Certificate) certificateChain[0];
LOGGER.debug("Successfully downloaded X509Certificate with DN {} certificate from {}",
x509Certificate.getSubjectDN(), url);
return x509Certificate;
}
else
{
throw new IllegalConfigurationException(String.format("TLS handshake for '%s' from '%s' "
+ "did not provide a X509Certificate",
getName(),
url));
}
}
}
catch (IOException | GeneralSecurityException e)
{
throw new IllegalConfigurationException(String.format("Unable to get certificate for '%s' from '%s'",
getName(),
url), e);
}
}
});
}
finally
{
workerService.shutdown();
}
}
/** @return Trained NN model */
public NeuralNetworkModel train(Iterable<List<String>> sentences) throws InterruptedException {
ListeningExecutorService ex = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(config.numThreads));
int numSentences = Iterables.size(sentences);
numTrainedTokens += numSentences;
// Partition the sentences evenly amongst the threads
Iterable<List<List<String>>> partitioned = Iterables.partition(sentences, numSentences / config.numThreads + 1);
try {
listener.update(Stage.TRAIN_NEURAL_NETWORK, 0.0);
for (int iter = config.iterations; iter > 0; iter--) {
List<CallableVoid> tasks = new ArrayList<>();
int i = 0;
for (final List<List<String>> batch : partitioned) {
tasks.add(createWorker(i, iter, batch));
i++;
}
List<ListenableFuture<?>> futures = new ArrayList<>(tasks.size());
for (CallableVoid task : tasks)
futures.add(ex.submit(task));
try {
Futures.allAsList(futures).get();
} catch (ExecutionException e) {
throw new IllegalStateException("Error training neural network", e.getCause());
}
}
ex.shutdown();
} finally {
ex.shutdownNow();
}
return new NeuralNetworkModel() {
@Override public int layerSize() {
return config.layerSize;
}
@Override public double[][] vectors() {
return syn0;
}
};
}
public static TasmoServiceHandle<ReadMaterializerViewFields> initialize(TasmoReadMaterializerConfig config,
TasmoViewModel tasmoViewModel,
WrittenEventProvider writtenEventProvider,
TasmoStorageProvider tasmoStorageProvider) throws Exception {
ConcurrencyStore concurrencyStore = new HBaseBackedConcurrencyStore(tasmoStorageProvider.concurrencyStorage());
ReferenceStore referenceStore = new ReferenceStore(concurrencyStore,
tasmoStorageProvider.multiLinksStorage(),
tasmoStorageProvider.multiBackLinksStorage());
// TODO add config option to switch between batching and serial.
ReferenceTraverser referenceTraverser = new SerialReferenceTraverser(referenceStore);
EventValueStore eventValueStore = new EventValueStore(concurrencyStore, tasmoStorageProvider.eventStorage());
FieldValueReader fieldValueReader = new EventValueStoreFieldValueReader(eventValueStore);
ThreadFactory eventProcessorThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("view-read-materialization-processor-%d")
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t.getName() + " threw uncaught exception", e);
}
})
.build();
ExecutorService processorThreads = Executors.newFixedThreadPool(config.getNumberOfViewRequestProcessorThreads(), eventProcessorThreadFactory);
final ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(processorThreads);
final ReadMaterializerViewFields readMaterializer = new ReadMaterializerViewFields(referenceTraverser,
fieldValueReader, concurrencyStore, tasmoViewModel, listeningDecorator);
return new TasmoServiceHandle<ReadMaterializerViewFields>() {
@Override
public ReadMaterializerViewFields getService() {
return readMaterializer;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
listeningDecorator.shutdown();
}
};
}
@Test
public void closesCreatedParsers() throws Exception {
int parsersCount = 4;
AtomicInteger parserCount = new AtomicInteger(0);
Cells cell = new TestCellBuilder().build();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(parsersCount));
CountDownLatch createParserLatch = new CountDownLatch(parsersCount);
try (ProjectBuildFileParserPool parserPool =
createParserPool(
parsersCount,
(eventBus, input, watchman, threadSafe) -> {
parserCount.incrementAndGet();
ProjectBuildFileParser parser = EasyMock.createMock(ProjectBuildFileParser.class);
try {
EasyMock.expect(parser.getManifest(EasyMock.anyObject(Path.class)))
.andAnswer(
() -> {
createParserLatch.countDown();
createParserLatch.await();
return EMPTY_BUILD_FILE_MANIFEST;
})
.anyTimes();
parser.close();
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>() {
@Override
public Void answer() {
parserCount.decrementAndGet();
return null;
}
});
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
EasyMock.replay(parser);
return parser;
})) {
Futures.allAsList(
scheduleWork(cell.getRootCell(), parserPool, executorService, parsersCount * 2))
.get();
assertThat(parserCount.get(), Matchers.is(4));
} finally {
executorService.shutdown();
}
// Parser shutdown is async.
for (int i = 0; i < 10; ++i) {
if (parserCount.get() == 0) {
break;
}
Thread.sleep(100);
}
assertThat(parserCount.get(), Matchers.is(0));
}
@Test
public void closeWhenRunningJobs() throws Exception {
Cells cell = new TestCellBuilder().build();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
CountDownLatch waitTillClosed = new CountDownLatch(1);
CountDownLatch firstJobRunning = new CountDownLatch(1);
AtomicInteger postCloseWork = new AtomicInteger(0);
ImmutableSet<ListenableFuture<?>> futures;
try (ProjectBuildFileParserPool parserPool =
createParserPool(
/* maxParsers */ 1,
createMockParserFactory(
() -> {
firstJobRunning.countDown();
waitTillClosed.await();
return EMPTY_BUILD_FILE_MANIFEST;
}))) {
futures = scheduleWork(cell.getRootCell(), parserPool, executorService, 5);
for (ListenableFuture<?> future : futures) {
Futures.addCallback(
future,
new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
postCloseWork.incrementAndGet();
}
@Override
public void onFailure(Throwable t) {}
});
}
firstJobRunning.await(1, TimeUnit.SECONDS);
}
waitTillClosed.countDown();
List<Object> futureResults = Futures.successfulAsList(futures).get(1, TimeUnit.SECONDS);
// The threadpool is of size 1, so we had 1 job in the 'running' state. That one job completed
// normally, the rest should have been cancelled.
int expectedCompletedJobs = 1;
int completedJobs = FluentIterable.from(futureResults).filter(Objects::nonNull).size();
assertThat(completedJobs, Matchers.equalTo(expectedCompletedJobs));
executorService.shutdown();
assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS), Matchers.is(true));
assertThat(postCloseWork.get(), Matchers.equalTo(expectedCompletedJobs));
}
@Test
public void multipleTopLevelRulesDontBlockEachOther() throws Exception {
Exchanger<Boolean> exchanger = new Exchanger<>();
Step exchangerStep =
new AbstractExecutionStep("interleaved_step") {
@Override
public StepExecutionResult execute(ExecutionContext context)
throws InterruptedException {
try {
// Forces both rules to wait for the other at this point.
exchanger.exchange(true, 6, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return StepExecutionResults.SUCCESS;
}
};
BuildRule interleavedRuleOne =
createRule(
filesystem,
graphBuilder,
/* deps */ ImmutableSortedSet.of(),
/* buildSteps */ ImmutableList.of(exchangerStep),
/* postBuildSteps */ ImmutableList.of(),
/* pathToOutputFile */ null,
ImmutableList.of(InternalFlavor.of("interleaved-1")));
graphBuilder.addToIndex(interleavedRuleOne);
BuildRule interleavedRuleTwo =
createRule(
filesystem,
graphBuilder,
/* deps */ ImmutableSortedSet.of(),
/* buildSteps */ ImmutableList.of(exchangerStep),
/* postBuildSteps */ ImmutableList.of(),
/* pathToOutputFile */ null,
ImmutableList.of(InternalFlavor.of("interleaved-2")));
graphBuilder.addToIndex(interleavedRuleTwo);
// The engine needs a couple of threads to ensure that it can schedule multiple steps at the
// same time.
ListeningExecutorService executorService =
listeningDecorator(Executors.newFixedThreadPool(4));
try (CachingBuildEngine cachingBuildEngine =
cachingBuildEngineFactory().setExecutorService(executorService).build()) {
BuildEngine.BuildEngineResult engineResultOne =
cachingBuildEngine.build(
buildContext, TestExecutionContext.newInstance(), interleavedRuleOne);
BuildEngine.BuildEngineResult engineResultTwo =
cachingBuildEngine.build(
buildContext, TestExecutionContext.newInstance(), interleavedRuleTwo);
assertThat(engineResultOne.getResult().get().getStatus(), equalTo(BuildRuleStatus.SUCCESS));
assertThat(engineResultTwo.getResult().get().getStatus(), equalTo(BuildRuleStatus.SUCCESS));
}
executorService.shutdown();
}