下面列出了java.util.concurrent.ForkJoinPool#shutdown ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Usage: NQueensCS [minBoardSize=N] [maxBoardSize=N] [procs=N] [reps=N]
*/
public static void main(String[] args) throws Exception {
// Board sizes too small: hard to measure well.
// Board sizes too large: take too long to run.
final int minBoardSize = intArg(args, "minBoardSize", 8);
final int maxBoardSize = intArg(args, "maxBoardSize", 15);
final int procs = intArg(args, "procs", 0);
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
ForkJoinPool g = (procs == 0) ?
new ForkJoinPool() :
new ForkJoinPool(procs);
lastStealCount = g.getStealCount();
for (int i = minBoardSize; i <= maxBoardSize; i++)
test(g, i);
System.out.println(g);
g.shutdown();
}
}
/**
* Usage: NQueensCS [minBoardSize=N] [maxBoardSize=N] [procs=N] [reps=N]
*/
public static void main(String[] args) throws Exception {
// Board sizes too small: hard to measure well.
// Board sizes too large: take too long to run.
final int minBoardSize = intArg(args, "minBoardSize", 8);
final int maxBoardSize = intArg(args, "maxBoardSize", 15);
final int procs = intArg(args, "procs", 0);
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
ForkJoinPool g = (procs == 0) ?
new ForkJoinPool() :
new ForkJoinPool(procs);
lastStealCount = g.getStealCount();
for (int i = minBoardSize; i <= maxBoardSize; i++)
test(g, i);
System.out.println(g);
g.shutdown();
}
}
/**
* Usage: NQueensCS [minBoardSize=N] [maxBoardSize=N] [procs=N] [reps=N]
*/
public static void main(String[] args) throws Exception {
// Board sizes too small: hard to measure well.
// Board sizes too large: take too long to run.
final int minBoardSize = intArg(args, "minBoardSize", 8);
final int maxBoardSize = intArg(args, "maxBoardSize", 15);
final int procs = intArg(args, "procs", 0);
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
ForkJoinPool g = (procs == 0) ?
new ForkJoinPool() :
new ForkJoinPool(procs);
lastStealCount = g.getStealCount();
for (int i = minBoardSize; i <= maxBoardSize; i++)
test(g, i);
System.out.println(g);
g.shutdown();
}
}
/**
* 使用fork/join并行计算数字累加。
*
* @param count 数字个数(从1开始)
* @throws InterruptedException
*/
private static void parallelCompute(int count)
throws InterruptedException {
int[] numbers = new int[count];
for (int i = 0; i < count; i++) {
numbers[i] = i+1;
}
ForkJoinPool pool = new ForkJoinPool(4);
HelloForkJoin task = new HelloForkJoin(numbers);
long startTime = System.currentTimeMillis();
pool.submit(task);
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println( String.format("并行计算结果:%d,耗时:%d毫秒", task._result, System.currentTimeMillis()-startTime) );
}
/**
* awaitTermination on a non-shutdown pool times out
*/
public void testAwaitTermination_timesOut() throws InterruptedException {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
assertFalse(p.isTerminated());
assertFalse(p.awaitTermination(Long.MIN_VALUE, NANOSECONDS));
assertFalse(p.awaitTermination(Long.MIN_VALUE, MILLISECONDS));
assertFalse(p.awaitTermination(-1L, NANOSECONDS));
assertFalse(p.awaitTermination(-1L, MILLISECONDS));
assertFalse(p.awaitTermination(0L, NANOSECONDS));
assertFalse(p.awaitTermination(0L, MILLISECONDS));
long timeoutNanos = 999999L;
long startTime = System.nanoTime();
assertFalse(p.awaitTermination(timeoutNanos, NANOSECONDS));
assertTrue(System.nanoTime() - startTime >= timeoutNanos);
assertFalse(p.isTerminated());
startTime = System.nanoTime();
long timeoutMillis = timeoutMillis();
assertFalse(p.awaitTermination(timeoutMillis, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
assertFalse(p.isTerminated());
p.shutdown();
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
assertTrue(p.isTerminated());
}
}
public void test_generateId_multithread() throws Exception {
final Map<String, Object> dataMap = new HashMap<String, Object>();
dataMap.put("url", "http://example.com/");
final List<String> list = new ArrayList<>();
for (int i = 100; i > 0; i--) {
list.add(String.valueOf(i));
}
dataMap.put("role", list);
dataMap.put("virtual_host", list);
final String result =
"f8240bbae62b99960056c3a382844836c547c2ec73e019491bb7bbb02d92d98e876c8204b67a59ca8123b82d20986516b7d451f68dd634b39004c0d36c0eeca4";
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
final AtomicInteger counter = new AtomicInteger(0);
final ForkJoinPool pool = new ForkJoinPool(10);
for (int i = 0; i < 1000; i++) {
pool.execute(() -> {
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
counter.incrementAndGet();
});
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(1000, counter.get());
}
@Override
protected Set<PointerSearchResult> call() throws Exception {
ForkJoinPool pool = new ForkJoinPool(threadCount);
try {
MemoryDump dump = openDump(dumpPath);
List<PointerSearchResult>[] results = new List[maxDepth];
total = dump.getSize() * maxDepth;
for (depth = 0; depth < maxDepth && !isCancelled(); depth++) {
results[depth] = new ArrayList<>();
pool.invoke(new PointerRecursiveTask(this, dump, dump.getIndices(), results, depth));
}
Set<PointerSearchResult> res = new HashSet<>();
for (List<PointerSearchResult> lst : results) {
res.addAll(lst);
}
return res;
} finally {
pool.shutdown();
}
}
public static void main(String[] args) throws Exception {
boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
ForkJoinPool executor = new ForkJoinPool();
for (int i = 0; i < 50; i++) {
final String userId = "user" + i;
executor.execute(
new Runnable() {
@Override
public void run() {
client.greet(userId);
}
});
}
executor.awaitQuiescence(100, TimeUnit.SECONDS);
executor.shutdown();
client.printSummary();
client.shutdown();
}
/**
* Usage: Integrate [procs=N] [reps=N] forkPolicy=serial|dynamic|fork
*/
public static void main(String[] args) throws Exception {
final int procs = intArg(args, "procs",
Runtime.getRuntime().availableProcessors());
final int forkPolicy = policyArg(args, "forkPolicy", DYNAMIC);
ForkJoinPool g = new ForkJoinPool(procs);
System.out.println("Integrating from " + start + " to " + end +
" forkPolicy = " + forkPolicy);
long lastTime = System.nanoTime();
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
double a;
if (forkPolicy == SERIAL)
a = SQuad.computeArea(g, start, end);
else if (forkPolicy == FORK)
a = FQuad.computeArea(g, start, end);
else
a = DQuad.computeArea(g, start, end);
long now = System.nanoTime();
double s = (double) (now - lastTime) / NPS;
lastTime = now;
System.out.printf("Calls/sec: %12d", (long) (calls / s));
System.out.printf(" Time: %7.3f", s);
System.out.printf(" Area: %12.1f", a);
System.out.println();
}
System.out.println(g);
g.shutdown();
}
private <G, T extends MCAFilter<G>> T filterWorld(final T filter, RunnableVal2<Path, RunnableVal2<Path, BasicFileAttributes>> traverser) {
File folder = getSaveFolder();
final ForkJoinPool pool = new ForkJoinPool();
filter.withPool(pool, this);
RunnableVal2<Path, BasicFileAttributes> task = filterFunction(filter, pool);
traverser.run(folder.toPath(), task);
pool.shutdown();
try {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
return filter;
}
@Test
void pool_publish_pushCollection() {
List<Integer> integers = new ArrayList<>();
for (int i=1;i<3;i++){
integers.add(i);
}
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
ObservablePublish<Integer> integerObservable = (ObservablePublish<Integer>) pushCollectionDANGER(integers, forkJoinPool).publish();
integerObservable.subscribe(x -> {
log("一郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.subscribe(x -> {
log("二郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.connect();
sleep(2,TimeUnit.SECONDS);
//此处在我们解读API的时候有提到过,接收一个Disposable对象,并实现其消费动作
integerObservable.connect(ps -> ps.dispose());
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 15;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
/**
* Usage: Integrate [procs=N] [reps=N] forkPolicy=serial|dynamic|fork
*/
public static void main(String[] args) throws Exception {
final int procs = intArg(args, "procs",
Runtime.getRuntime().availableProcessors());
final int forkPolicy = policyArg(args, "forkPolicy", DYNAMIC);
ForkJoinPool g = new ForkJoinPool(procs);
System.out.println("Integrating from " + start + " to " + end +
" forkPolicy = " + forkPolicy);
long lastTime = System.nanoTime();
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
double a;
if (forkPolicy == SERIAL)
a = SQuad.computeArea(g, start, end);
else if (forkPolicy == FORK)
a = FQuad.computeArea(g, start, end);
else
a = DQuad.computeArea(g, start, end);
long now = System.nanoTime();
double s = (double) (now - lastTime) / NPS;
lastTime = now;
System.out.printf("Calls/sec: %12d", (long) (calls / s));
System.out.printf(" Time: %7.3f", s);
System.out.printf(" Area: %12.1f", a);
System.out.println();
}
System.out.println(g);
g.shutdown();
}
public static double calculateAverageBrightness(BufferedImage image) {
double result = 0;
int imageType = image.getType();
if (imageType != BufferedImage.TYPE_INT_ARGB && imageType != BufferedImage.TYPE_INT_RGB && imageType != BufferedImage.TYPE_3BYTE_BGR
&& imageType != BufferedImage.TYPE_4BYTE_ABGR && imageType != BufferedImage.TYPE_4BYTE_ABGR_PRE && imageType != BufferedImage.TYPE_INT_ARGB_PRE
&& imageType != BufferedImage.TYPE_INT_BGR) {
throw new RuntimeException("Unsupported image type: " + image.getType());
}
boolean hasAlpha = image.getAlphaRaster() != null;
int pixelSize = hasAlpha ? 4 : 3;
byte[] pixels = ((DataBufferByte) image.getRaster().getDataBuffer()).getData();
int cpuCores = Runtime.getRuntime().availableProcessors();
final ForkJoinPool pool = new ForkJoinPool(cpuCores);
BrightnessCalcTask[] tasks = new BrightnessCalcTask[cpuCores];
int subArraySize = (int) Math.ceil(((double) pixels.length) / cpuCores);
if (subArraySize % pixelSize != 0) {
subArraySize += pixelSize - subArraySize % pixelSize;
}
for (int i = 0; i < cpuCores; i++) {
tasks[i] = new BrightnessCalcTask(pixels, subArraySize * i, Math.min(subArraySize * (i + 1), pixels.length), pixelSize);
pool.submit(tasks[i]);
}
pool.shutdown();
while (!pool.isTerminated()) {
try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
for (BrightnessCalcTask task : tasks) {
result += task.getRawResult();
}
result = result / tasks.length;
return result;
}
public static void executeInCustomFJP(Runnable r) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(r);
Stream<Object> ss = Stream.of("a");
pool.submit(() -> ss.forEach(System.out::println));
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
/**
* Usage: Integrate [procs=N] [reps=N] forkPolicy=serial|dynamic|fork
*/
public static void main(String[] args) throws Exception {
final int procs = intArg(args, "procs",
Runtime.getRuntime().availableProcessors());
final int forkPolicy = policyArg(args, "forkPolicy", DYNAMIC);
ForkJoinPool g = new ForkJoinPool(procs);
System.out.println("Integrating from " + start + " to " + end +
" forkPolicy = " + forkPolicy);
long lastTime = System.nanoTime();
for (int reps = intArg(args, "reps", 10); reps > 0; reps--) {
double a;
if (forkPolicy == SERIAL)
a = SQuad.computeArea(g, start, end);
else if (forkPolicy == FORK)
a = FQuad.computeArea(g, start, end);
else
a = DQuad.computeArea(g, start, end);
long now = System.nanoTime();
double s = (double) (now - lastTime) / NPS;
lastTime = now;
System.out.printf("Calls/sec: %12d", (long) (calls / s));
System.out.printf(" Time: %7.3f", s);
System.out.printf(" Area: %12.1f", a);
System.out.println();
}
System.out.println(g);
g.shutdown();
}
/**
* A task submitted after shutdown is rejected
*/
public void testSubmitAfterShutdown() {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
p.shutdown();
assertTrue(p.isShutdown());
try {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
shouldThrow();
} catch (RejectedExecutionException success) {}
}
}
/**
* A task submitted after shutdown is rejected
*/
public void testSubmitAfterShutdown() {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
p.shutdown();
assertTrue(p.isShutdown());
try {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
shouldThrow();
} catch (RejectedExecutionException success) {}
}
}
private void stealCountInfo(StringBuilder info, int granularity, ForkJoinPool forkJoinPool) {
PrimeNumbers primes = new PrimeNumbers(1, 10000, granularity, new AtomicInteger(0));
forkJoinPool.invoke(primes);
forkJoinPool.shutdown();
long steals = forkJoinPool.getStealCount();
String output = "\nGranularity: [" + granularity + "], Steals: [" + steals + "]";
info.append(output);
}
/**
* Reorders list of checkpoint pages and splits them into appropriate number of sublists according to
* {@link DataStorageConfiguration#getCheckpointThreads()} and
* {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
*
* @param cpPages Checkpoint pages with overall count and user pages info.
*/
private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> splitAndSortCpPagesIfNeeded(
CheckpointPagesInfoHolder cpPages
) throws IgniteCheckedException {
Set<T2<PageMemoryEx, FullPageId[]>> cpPagesPerRegion = new HashSet<>();
int realPagesArrSize = 0;
int totalPagesCnt = cpPages.pagesNum();
for (Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>> regPages : cpPages.cpPages()) {
FullPageId[] pages = new FullPageId[regPages.getValue().size()];
int pagePos = 0;
for (int i = 0; i < regPages.getValue().collectionsSize(); i++) {
for (FullPageId page : regPages.getValue().innerCollection(i)) {
if (realPagesArrSize++ == totalPagesCnt)
throw new AssertionError("Incorrect estimated dirty pages number: " + totalPagesCnt);
pages[pagePos++] = page;
}
}
// Some pages may have been already replaced.
if (pagePos != pages.length)
cpPagesPerRegion.add(new T2<>(regPages.getKey(), Arrays.copyOf(pages, pagePos)));
else
cpPagesPerRegion.add(new T2<>(regPages.getKey(), pages));
}
if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId)
.thenComparingLong(FullPageId::effectivePageId);
ForkJoinPool pool = null;
for (T2<PageMemoryEx, FullPageId[]> pagesPerReg : cpPagesPerRegion) {
if (pagesPerReg.getValue().length >= parallelSortThreshold)
pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, pool);
else
Arrays.sort(pagesPerReg.get2(), cmp);
}
if (pool != null)
pool.shutdown();
}
return new GridConcurrentMultiPairQueue<>(cpPagesPerRegion);
}
/**
* Lists file statuses recursively based on given file system objects {@link Scope}.
* Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task
* to parallel and speed up listing.
*
* @param fs file system
* @param path path to file or directory
* @param scope file system objects scope
* @param suppressExceptions indicates if exceptions should be ignored
* @param filter filter to be applied
* @return list of file statuses
*/
private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
ForkJoinPool pool = new ForkJoinPool();
try {
RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter);
return pool.invoke(task);
} finally {
pool.shutdown();
}
}