下面列出了java.util.concurrent.ForkJoinPool#awaitTermination ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Call the metrics API on each host and aggregate the metrics
* into a single value, grouped by cluster.
*/
public Map<ClusterInfo, MetricsAggregator> requestMetricsGroupedByCluster(Collection<URI> hosts) {
Map<ClusterInfo, MetricsAggregator> clusterMetricsMap = new ConcurrentHashMap<>();
long startTime = System.currentTimeMillis();
Runnable retrieveMetricsJob = () ->
hosts.parallelStream().forEach(host ->
getHostMetrics(host, clusterMetricsMap)
);
ForkJoinPool threadPool = new ForkJoinPool(10);
threadPool.submit(retrieveMetricsJob);
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.log(Level.FINE, () ->
String.format("Metric retrieval for %d nodes took %d milliseconds", hosts.size(), System.currentTimeMillis() - startTime)
);
return clusterMetricsMap;
}
@Test
void pool_pushCollection() {
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 1;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
@Test
public void testProcessAllFiles() {
try {
ForkJoinPool pool = new ForkJoinPool(2);
Set<Throwable> issues = new HashSet<>();
VirtualFile f = fs.resolveVirtualFile("zip://" + new File("src/test/resources/test.zip").getAbsolutePath());
Set<VirtualFile> files = new HashSet<>();
VirtualFileUtilities.asynchronouslyProcessAllFiles(pool, vf -> {
assertNotNull(vf);
files.add(vf);
}, f, issues, true);
pool.shutdown();
pool.awaitTermination(3, TimeUnit.SECONDS);
assertTrue(pool.isShutdown());
assertTrue(pool.isQuiescent());
assertEquals(2, files.size());
assertTrue(issues.isEmpty());
} catch (Exception e) {
fail(e.getMessage());
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadUtils.timerStart();
Stream<BigDecimal> primeStream = IntStream.range(40000, 50000)
// Stream<BigDecimal> primeStream = IntStream.range(10010000, 10020000)
.parallel()
.mapToObj(i->new BigDecimal(i))
.filter(CpuContention::isPrime)
;
ForkJoinPool pool = new ForkJoinPool(6);
Long n = pool.submit(() ->primeStream.count()).get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);
ThreadUtils.timerEndPrint();
System.out.println("Found: " +n);
}
public static void deleteOlder(File directory, final long timeDiff, boolean printDebug) {
final long now = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
iterateFiles(directory, new Consumer<File>() {
@Override
public void accept(File file) {
long age = now - file.lastModified();
if (age > timeDiff) {
pool.submit(() -> file.delete());
if (printDebug) BBC.FILE_DELETED.send(null, file);
}
}
});
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void test_generateId_multithread() throws Exception {
final Map<String, Object> dataMap = new HashMap<String, Object>();
dataMap.put("url", "http://example.com/");
final List<String> list = new ArrayList<>();
for (int i = 100; i > 0; i--) {
list.add(String.valueOf(i));
}
dataMap.put("role", list);
dataMap.put("virtual_host", list);
final String result =
"f8240bbae62b99960056c3a382844836c547c2ec73e019491bb7bbb02d92d98e876c8204b67a59ca8123b82d20986516b7d451f68dd634b39004c0d36c0eeca4";
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
final AtomicInteger counter = new AtomicInteger(0);
final ForkJoinPool pool = new ForkJoinPool(10);
for (int i = 0; i < 1000; i++) {
pool.execute(() -> {
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
counter.incrementAndGet();
});
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(1000, counter.get());
}
/**
* 使用fork/join并行计算数字累加。
*
* @param count 数字个数(从1开始)
* @throws InterruptedException
*/
private static void parallelCompute(int count)
throws InterruptedException {
int[] numbers = new int[count];
for (int i = 0; i < count; i++) {
numbers[i] = i+1;
}
ForkJoinPool pool = new ForkJoinPool(4);
HelloForkJoin task = new HelloForkJoin(numbers);
long startTime = System.currentTimeMillis();
pool.submit(task);
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println( String.format("并行计算结果:%d,耗时:%d毫秒", task._result, System.currentTimeMillis()-startTime) );
}
@Test
void pool_publish_pushCollection() {
List<Integer> integers = new ArrayList<>();
for (int i=1;i<3;i++){
integers.add(i);
}
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
ObservablePublish<Integer> integerObservable = (ObservablePublish<Integer>) pushCollectionDANGER(integers, forkJoinPool).publish();
integerObservable.subscribe(x -> {
log("一郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.subscribe(x -> {
log("二郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.connect();
sleep(2,TimeUnit.SECONDS);
//此处在我们解读API的时候有提到过,接收一个Disposable对象,并实现其消费动作
integerObservable.connect(ps -> ps.dispose());
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 15;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
@Test
void infinite_refCount_publish_test() {
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
Observable<Integer> integerObservable = pushCollectionDANGER(integers, forkJoinPool).publish().refCount();
integerObservable.subscribe(x -> {
log("一郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
sleep(1,TimeUnit.SECONDS);
integerObservable.subscribe(x -> {
log("二郎神: " + x);
sleep(1,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
sleep(20,TimeUnit.SECONDS);
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
private void testThenComposeAhead(
final int count, final int ahead, final Supplier<Long> sleepMillis, final long finishInMillis)
throws InterruptedException, TimeoutException {
final ForkJoinPool fjp = new ForkJoinPool(ahead);
final AsyncIterator<Integer> it = intIterator(count);
final AsyncIterator<Integer> mapped =
it.thenComposeAhead(
i -> {
return CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(sleepMillis.get());
} catch (final InterruptedException e) {
}
return i;
},
fjp);
},
ahead);
final List<Integer> lis =
TestUtil.join(mapped.collect(Collectors.toList()), finishInMillis, TimeUnit.MILLISECONDS);
Assert.assertEquals(IntStream.range(0, count).boxed().collect(Collectors.toList()), lis);
fjp.shutdown();
fjp.awaitTermination(1, TimeUnit.SECONDS);
}
public static void executeInCustomFJP(Runnable r) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(r);
Stream<Object> ss = Stream.of("a");
pool.submit(() -> ss.forEach(System.out::println));
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
private <G, T extends MCAFilter<G>> T filterWorld(final T filter, RunnableVal2<Path, RunnableVal2<Path, BasicFileAttributes>> traverser) {
File folder = getSaveFolder();
final ForkJoinPool pool = new ForkJoinPool();
filter.withPool(pool, this);
RunnableVal2<Path, BasicFileAttributes> task = filterFunction(filter, pool);
traverser.run(folder.toPath(), task);
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
return filter;
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
SimpleRunnable simpleRunnable = new SimpleRunnable();
pool.execute(simpleRunnable);
simpleRunnable.latch.await();
pool.shutdown();
pool.awaitTermination(10, SECONDS);
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
SimpleTask simpleTask = new SimpleTask();
pool.execute(simpleTask);
simpleTask.latch.await();
pool.shutdown();
pool.awaitTermination(10, SECONDS);
}
public static double calculateAverageBrightness(BufferedImage image) {
double result = 0;
int imageType = image.getType();
if (imageType != BufferedImage.TYPE_INT_ARGB && imageType != BufferedImage.TYPE_INT_RGB && imageType != BufferedImage.TYPE_3BYTE_BGR
&& imageType != BufferedImage.TYPE_4BYTE_ABGR && imageType != BufferedImage.TYPE_4BYTE_ABGR_PRE && imageType != BufferedImage.TYPE_INT_ARGB_PRE
&& imageType != BufferedImage.TYPE_INT_BGR) {
throw new RuntimeException("Unsupported image type: " + image.getType());
}
boolean hasAlpha = image.getAlphaRaster() != null;
int pixelSize = hasAlpha ? 4 : 3;
byte[] pixels = ((DataBufferByte) image.getRaster().getDataBuffer()).getData();
int cpuCores = Runtime.getRuntime().availableProcessors();
final ForkJoinPool pool = new ForkJoinPool(cpuCores);
BrightnessCalcTask[] tasks = new BrightnessCalcTask[cpuCores];
int subArraySize = (int) Math.ceil(((double) pixels.length) / cpuCores);
if (subArraySize % pixelSize != 0) {
subArraySize += pixelSize - subArraySize % pixelSize;
}
for (int i = 0; i < cpuCores; i++) {
tasks[i] = new BrightnessCalcTask(pixels, subArraySize * i, Math.min(subArraySize * (i + 1), pixels.length), pixelSize);
pool.submit(tasks[i]);
}
pool.shutdown();
while (!pool.isTerminated()) {
try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
for (BrightnessCalcTask task : tasks) {
result += task.getRawResult();
}
result = result / tasks.length;
return result;
}
@Test
public void test() throws Exception {
ForkJoinPool pool = new ForkJoinPool(PARALLELISM);
final int[] result = {0};
pool.submit(() -> {
result[0] = calcSumOfTen();});
pool.awaitTermination(2, TimeUnit.SECONDS);
assertThat(result[0]).isEqualTo(55);
pool.shutdown();
}
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
/* integers.forEach(id -> forkJoinPool.submit(() -> {
sleep(3,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
}));*/
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
public void clean(
BuckConfig buckConfig,
Path ideaConfigDir,
Path librariesXmlBase,
boolean runPostGenerationCleaner,
boolean removeOldLibraries) {
if (!runPostGenerationCleaner && !removeOldLibraries) {
return;
}
Set<File> buckDirectories = new HashSet<>();
buckDirectories.add(
convertPathToFile(
projectFilesystem.resolve(projectFilesystem.getBuckPaths().getBuckOut())));
ArtifactCacheBuckConfig cacheBuckConfig = new ArtifactCacheBuckConfig(buckConfig);
for (DirCacheEntry entry : cacheBuckConfig.getCacheEntries().getDirCacheEntries()) {
buckDirectories.add(convertPathToFile(entry.getCacheDir()));
}
ForkJoinPool cleanExecutor = new ForkJoinPool(getParallelismLimit());
try {
cleanExecutor.invoke(
new RecursiveAction() {
@Override
protected void compute() {
List<RecursiveAction> topLevelTasks = new ArrayList<>(2);
if (runPostGenerationCleaner) {
topLevelTasks.add(
new CandidateFinderWithExclusions(
convertPathToFile(projectFilesystem.resolve("")),
IML_FILENAME_FILTER,
buckDirectories));
topLevelTasks.add(
new CandidateFinderWithExclusions(
ideaConfigDir.toFile(), IML_FILENAME_FILTER, buckDirectories));
}
topLevelTasks.add(
new CandidateFinder(convertPathToFile(librariesXmlBase), XML_FILENAME_FILTER));
invokeAll(topLevelTasks);
}
});
} finally {
cleanExecutor.shutdown();
try {
cleanExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIME_UNIT);
} catch (InterruptedException e) {
Logger.get(IJProjectCleaner.class).warn("Timeout during executor shutdown.", e);
}
}
}