java.util.concurrent.ForkJoinPool#awaitTermination ( )源码实例Demo

下面列出了java.util.concurrent.ForkJoinPool#awaitTermination ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: vespa   文件: ClusterMetricsRetriever.java
/**
 * Call the metrics API on each host and aggregate the metrics
 * into a single value, grouped by cluster.
 */
public Map<ClusterInfo, MetricsAggregator> requestMetricsGroupedByCluster(Collection<URI> hosts) {
    Map<ClusterInfo, MetricsAggregator> clusterMetricsMap = new ConcurrentHashMap<>();

    long startTime = System.currentTimeMillis();
    Runnable retrieveMetricsJob = () ->
            hosts.parallelStream().forEach(host ->
                getHostMetrics(host, clusterMetricsMap)
            );

    ForkJoinPool threadPool = new ForkJoinPool(10);
    threadPool.submit(retrieveMetricsJob);
    threadPool.shutdown();

    try {
        threadPool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

    log.log(Level.FINE, () ->
            String.format("Metric retrieval for %d nodes took %d milliseconds", hosts.size(), System.currentTimeMillis() - startTime)
    );

    return clusterMetricsMap;
}
 
@Test
void pool_pushCollection() {
    List<Integer> integers = new ArrayList<>();
    integers.add(1);
    integers.add(2);
    integers.add(3);
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
        pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 1;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码3 项目: mobi   文件: DefaultVirtualFilesystemTest.java
@Test
public void testProcessAllFiles() {
    try {
        ForkJoinPool pool = new ForkJoinPool(2);
        Set<Throwable> issues = new HashSet<>();
        VirtualFile f = fs.resolveVirtualFile("zip://" + new File("src/test/resources/test.zip").getAbsolutePath());
        Set<VirtualFile> files = new HashSet<>();
        VirtualFileUtilities.asynchronouslyProcessAllFiles(pool, vf -> {
            assertNotNull(vf);
            files.add(vf);
        }, f, issues, true);
        pool.shutdown();
        pool.awaitTermination(3, TimeUnit.SECONDS);
        assertTrue(pool.isShutdown());
        assertTrue(pool.isQuiescent());
        assertEquals(2, files.size());
        assertTrue(issues.isEmpty());
    } catch (Exception e) {
        fail(e.getMessage());
    }
}
 
源代码4 项目: training   文件: CpuContention.java
public static void main(String[] args) throws InterruptedException, ExecutionException {
		ThreadUtils.timerStart();
		Stream<BigDecimal> primeStream = IntStream.range(40000, 50000)
//		Stream<BigDecimal> primeStream = IntStream.range(10010000, 10020000)
			.parallel()
			.mapToObj(i->new BigDecimal(i))
			.filter(CpuContention::isPrime)
			;
		
		ForkJoinPool pool = new ForkJoinPool(6);
		Long n = pool.submit(() ->primeStream.count()).get();
		pool.shutdownNow();
		pool.awaitTermination(1, TimeUnit.MINUTES);
		ThreadUtils.timerEndPrint();
		System.out.println("Found: " +n);
		
	}
 
源代码5 项目: FastAsyncWorldedit   文件: MainUtil.java
public static void deleteOlder(File directory, final long timeDiff, boolean printDebug) {
    final long now = System.currentTimeMillis();
    ForkJoinPool pool = new ForkJoinPool();
    iterateFiles(directory, new Consumer<File>() {
        @Override
        public void accept(File file) {
            long age = now - file.lastModified();
            if (age > timeDiff) {
                pool.submit(() -> file.delete());
                if (printDebug) BBC.FILE_DELETED.send(null, file);
            }
        }
    });
    pool.shutdown();
    try {
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
源代码6 项目: fess   文件: CrawlingInfoHelperTest.java
public void test_generateId_multithread() throws Exception {
    final Map<String, Object> dataMap = new HashMap<String, Object>();
    dataMap.put("url", "http://example.com/");
    final List<String> list = new ArrayList<>();
    for (int i = 100; i > 0; i--) {
        list.add(String.valueOf(i));
    }
    dataMap.put("role", list);
    dataMap.put("virtual_host", list);
    final String result =
            "f8240bbae62b99960056c3a382844836c547c2ec73e019491bb7bbb02d92d98e876c8204b67a59ca8123b82d20986516b7d451f68dd634b39004c0d36c0eeca4";
    assertEquals(result, crawlingInfoHelper.generateId(dataMap));

    final AtomicInteger counter = new AtomicInteger(0);
    final ForkJoinPool pool = new ForkJoinPool(10);
    for (int i = 0; i < 1000; i++) {
        pool.execute(() -> {
            assertEquals(result, crawlingInfoHelper.generateId(dataMap));
            counter.incrementAndGet();
        });
    }
    pool.shutdown();
    pool.awaitTermination(10, TimeUnit.SECONDS);
    assertEquals(1000, counter.get());
}
 
源代码7 项目: JavaTutorial   文件: HelloForkJoin.java
/**
 * 使用fork/join并行计算数字累加。
 * 
 * @param count 数字个数(从1开始)
 * @throws InterruptedException
 */
private static void parallelCompute(int count)
        throws InterruptedException {
    int[] numbers = new int[count];
    for (int i = 0; i < count; i++) {
        numbers[i] = i+1;
    }
    ForkJoinPool pool = new ForkJoinPool(4);
    HelloForkJoin task = new HelloForkJoin(numbers);
    long startTime = System.currentTimeMillis();
    pool.submit(task);
    pool.shutdown();
    pool.awaitTermination(10, TimeUnit.SECONDS);
    
    System.out.println( String.format("并行计算结果:%d,耗时:%d毫秒", task._result, System.currentTimeMillis()-startTime) );
}
 
@Test
void pool_publish_pushCollection() {
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<3;i++){
        integers.add(i);
    }
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        ObservablePublish<Integer> integerObservable = (ObservablePublish<Integer>) pushCollectionDANGER(integers, forkJoinPool).publish();
        integerObservable.subscribe(x -> {
            log("一郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        integerObservable.subscribe(x -> {
            log("二郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        integerObservable.connect();
        sleep(2,TimeUnit.SECONDS);
        //此处在我们解读API的时候有提到过,接收一个Disposable对象,并实现其消费动作
        integerObservable.connect(ps -> ps.dispose());
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 15;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
@Test
void infinite_refCount_publish_test() {
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        Observable<Integer> integerObservable =  pushCollectionDANGER(integers, forkJoinPool).publish().refCount();
        integerObservable.subscribe(x -> {
            log("一郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        sleep(1,TimeUnit.SECONDS);
        integerObservable.subscribe(x -> {
            log("二郎神: " + x);
            sleep(1,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        sleep(20,TimeUnit.SECONDS);
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码10 项目: java-async-util   文件: AsyncIteratorTest.java
private void testThenComposeAhead(
    final int count, final int ahead, final Supplier<Long> sleepMillis, final long finishInMillis)
    throws InterruptedException, TimeoutException {
  final ForkJoinPool fjp = new ForkJoinPool(ahead);
  final AsyncIterator<Integer> it = intIterator(count);

  final AsyncIterator<Integer> mapped =
      it.thenComposeAhead(
          i -> {
            return CompletableFuture.supplyAsync(
                () -> {
                  try {
                    Thread.sleep(sleepMillis.get());
                  } catch (final InterruptedException e) {
                  }
                  return i;
                },
                fjp);
          },
          ahead);

  final List<Integer> lis =
      TestUtil.join(mapped.collect(Collectors.toList()), finishInMillis, TimeUnit.MILLISECONDS);
  Assert.assertEquals(IntStream.range(0, count).boxed().collect(Collectors.toList()), lis);
  fjp.shutdown();
  fjp.awaitTermination(1, TimeUnit.SECONDS);
}
 
源代码11 项目: training   文件: ParallelIOIntensive.java
public static void executeInCustomFJP(Runnable r) throws InterruptedException {
	ForkJoinPool pool = new ForkJoinPool(10);
	pool.submit(r);
	Stream<Object> ss = Stream.of("a");
	
	pool.submit(() -> ss.forEach(System.out::println));
	
	pool.shutdown();
	pool.awaitTermination(10, TimeUnit.SECONDS);
	
}
 
源代码12 项目: FastAsyncWorldedit   文件: MCAQueue.java
private <G, T extends MCAFilter<G>> T filterWorld(final T filter, RunnableVal2<Path, RunnableVal2<Path, BasicFileAttributes>> traverser) {
    File folder = getSaveFolder();
    final ForkJoinPool pool = new ForkJoinPool();
    filter.withPool(pool, this);
    RunnableVal2<Path, BasicFileAttributes> task = filterFunction(filter, pool);
    traverser.run(folder.toPath(), task);
    pool.shutdown();
    try {
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    } catch (Throwable e) {
        e.printStackTrace();
    }
    return filter;
}
 
源代码13 项目: glowroot   文件: ForkJoinPoolIT.java
@Override
public void transactionMarker() throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    SimpleRunnable simpleRunnable = new SimpleRunnable();
    pool.execute(simpleRunnable);
    simpleRunnable.latch.await();
    pool.shutdown();
    pool.awaitTermination(10, SECONDS);
}
 
源代码14 项目: glowroot   文件: ForkJoinPoolIT.java
@Override
public void transactionMarker() throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    SimpleTask simpleTask = new SimpleTask();
    pool.execute(simpleTask);
    simpleTask.latch.await();
    pool.shutdown();
    pool.awaitTermination(10, SECONDS);
}
 
源代码15 项目: gp2srv   文件: ImageUtil.java
public static double calculateAverageBrightness(BufferedImage image) {
	double result = 0;

	int imageType = image.getType();
	if (imageType != BufferedImage.TYPE_INT_ARGB && imageType != BufferedImage.TYPE_INT_RGB && imageType != BufferedImage.TYPE_3BYTE_BGR
			&& imageType != BufferedImage.TYPE_4BYTE_ABGR && imageType != BufferedImage.TYPE_4BYTE_ABGR_PRE && imageType != BufferedImage.TYPE_INT_ARGB_PRE
			&& imageType != BufferedImage.TYPE_INT_BGR) {
		throw new RuntimeException("Unsupported image type: " + image.getType());
	}
	boolean hasAlpha = image.getAlphaRaster() != null;
	int pixelSize = hasAlpha ? 4 : 3;
	byte[] pixels = ((DataBufferByte) image.getRaster().getDataBuffer()).getData();

	int cpuCores = Runtime.getRuntime().availableProcessors();
	final ForkJoinPool pool = new ForkJoinPool(cpuCores);

	BrightnessCalcTask[] tasks = new BrightnessCalcTask[cpuCores];
	int subArraySize = (int) Math.ceil(((double) pixels.length) / cpuCores);
	if (subArraySize % pixelSize != 0) {
		subArraySize += pixelSize - subArraySize % pixelSize;
	}
	for (int i = 0; i < cpuCores; i++) {
		tasks[i] = new BrightnessCalcTask(pixels, subArraySize * i, Math.min(subArraySize * (i + 1), pixels.length), pixelSize);
		pool.submit(tasks[i]);
	}
	pool.shutdown();
	while (!pool.isTerminated()) {
		try {
			pool.awaitTermination(5, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
		}
	}

	for (BrightnessCalcTask task : tasks) {
		result += task.getRawResult();
	}
	result = result / tasks.length;
	return result;
}
 
源代码16 项目: algorithms   文件: ForkJoinTest.java
@Test
public void test() throws Exception {
    ForkJoinPool pool = new ForkJoinPool(PARALLELISM);

    final int[] result = {0};
    pool.submit(() -> {
        result[0] = calcSumOfTen();});
    pool.awaitTermination(2, TimeUnit.SECONDS);
    assertThat(result[0]).isEqualTo(55);
    pool.shutdown();
}
 
@Test
void replay_PublishSubject_test() {
    PublishSubject<Object> publishSubject = PublishSubject.create();
    ConnectableObservable<Object> replay = publishSubject.replay();
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    Disposable subscribe1 = replay.subscribe(x -> {
        log("一郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

    Disposable subscribe2 = replay.subscribe(x -> {
        log("二郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    Disposable subscribe3 = replay.subscribe(x -> {
        log("三郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    AtomicInteger atomicInteger = new AtomicInteger(integers.size());
    try {
        forkJoinPool.submit(() -> {
            integers.forEach(id -> {
                sleep(1,TimeUnit.SECONDS);
                publishSubject.onNext(id);
                if (atomicInteger.decrementAndGet() == 0) {
                    publishSubject.onComplete();
                }
            });
        });
       /* integers.forEach(id -> forkJoinPool.submit(() -> {
            sleep(3,TimeUnit.SECONDS);
            publishSubject.onNext(id);
            if (atomicInteger.decrementAndGet() == 0) {
                publishSubject.onComplete();
            }
        }));*/
        replay.connect();
        sleep(2,TimeUnit.SECONDS);
        subscribe1.dispose();
        sleep(1,TimeUnit.SECONDS);
        //replay.connect(consumer -> consumer.dispose());
        publishSubject.onComplete();
    } finally  {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码18 项目: buck   文件: IJProjectCleaner.java
public void clean(
    BuckConfig buckConfig,
    Path ideaConfigDir,
    Path librariesXmlBase,
    boolean runPostGenerationCleaner,
    boolean removeOldLibraries) {
  if (!runPostGenerationCleaner && !removeOldLibraries) {
    return;
  }

  Set<File> buckDirectories = new HashSet<>();
  buckDirectories.add(
      convertPathToFile(
          projectFilesystem.resolve(projectFilesystem.getBuckPaths().getBuckOut())));

  ArtifactCacheBuckConfig cacheBuckConfig = new ArtifactCacheBuckConfig(buckConfig);
  for (DirCacheEntry entry : cacheBuckConfig.getCacheEntries().getDirCacheEntries()) {
    buckDirectories.add(convertPathToFile(entry.getCacheDir()));
  }

  ForkJoinPool cleanExecutor = new ForkJoinPool(getParallelismLimit());
  try {
    cleanExecutor.invoke(
        new RecursiveAction() {
          @Override
          protected void compute() {
            List<RecursiveAction> topLevelTasks = new ArrayList<>(2);
            if (runPostGenerationCleaner) {
              topLevelTasks.add(
                  new CandidateFinderWithExclusions(
                      convertPathToFile(projectFilesystem.resolve("")),
                      IML_FILENAME_FILTER,
                      buckDirectories));
              topLevelTasks.add(
                  new CandidateFinderWithExclusions(
                      ideaConfigDir.toFile(), IML_FILENAME_FILTER, buckDirectories));
            }
            topLevelTasks.add(
                new CandidateFinder(convertPathToFile(librariesXmlBase), XML_FILENAME_FILTER));
            invokeAll(topLevelTasks);
          }
        });
  } finally {
    cleanExecutor.shutdown();
    try {
      cleanExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIME_UNIT);
    } catch (InterruptedException e) {
      Logger.get(IJProjectCleaner.class).warn("Timeout during executor shutdown.", e);
    }
  }
}