java.util.concurrent.ForkJoinPool#shutdownNow ( )源码实例Demo

下面列出了java.util.concurrent.ForkJoinPool#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void pool_pushCollection() {
    List<Integer> integers = new ArrayList<>();
    integers.add(1);
    integers.add(2);
    integers.add(3);
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
        pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 1;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码2 项目: openjdk-jdk9   文件: ForkJoinPoolTest.java
/**
 * setUncaughtExceptionHandler changes handler for uncaught exceptions.
 *
 * Additionally tests: Overriding ForkJoinWorkerThread.onStart
 * performs its defined action
 */
public void testSetUncaughtExceptionHandler() throws InterruptedException {
    final CountDownLatch uehInvoked = new CountDownLatch(1);
    final Thread.UncaughtExceptionHandler ueh =
        new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                threadAssertTrue(e instanceof MyError);
                threadAssertTrue(t instanceof FailingFJWSubclass);
                uehInvoked.countDown();
            }};
    ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
                                      ueh, false);
    try (PoolCleaner cleaner = cleaner(p)) {
        assertSame(ueh, p.getUncaughtExceptionHandler());
        try {
            p.execute(new FibTask(8));
            await(uehInvoked);
        } finally {
            p.shutdownNow(); // failure might have prevented processing task
        }
    }
}
 
源代码3 项目: Jupiter   文件: ForkJoinPoolExecutorFactory.java
@Override
public CloseableExecutor newExecutor(Target target, String name) {
    final ForkJoinPool executor = new ForkJoinPool(
            coreWorkers(target),
            new DefaultForkJoinWorkerThreadFactory(name),
            new DefaultUncaughtExceptionHandler(), true);

    return new CloseableExecutor() {

        @Override
        public void execute(Runnable task) {
            executor.execute(task);
        }

        @Override
        public void shutdown() {
            logger.warn("ForkJoinPoolExecutorFactory#{} shutdown.", executor);
            executor.shutdownNow();
        }
    };
}
 
源代码4 项目: training   文件: CpuContention.java
public static void main(String[] args) throws InterruptedException, ExecutionException {
		ThreadUtils.timerStart();
		Stream<BigDecimal> primeStream = IntStream.range(40000, 50000)
//		Stream<BigDecimal> primeStream = IntStream.range(10010000, 10020000)
			.parallel()
			.mapToObj(i->new BigDecimal(i))
			.filter(CpuContention::isPrime)
			;
		
		ForkJoinPool pool = new ForkJoinPool(6);
		Long n = pool.submit(() ->primeStream.count()).get();
		pool.shutdownNow();
		pool.awaitTermination(1, TimeUnit.MINUTES);
		ThreadUtils.timerEndPrint();
		System.out.println("Found: " +n);
		
	}
 
源代码5 项目: j2objc   文件: ForkJoinPoolTest.java
/**
 * setUncaughtExceptionHandler changes handler for uncaught exceptions.
 *
 * Additionally tests: Overriding ForkJoinWorkerThread.onStart
 * performs its defined action
 */
public void testSetUncaughtExceptionHandler() throws InterruptedException {
    final CountDownLatch uehInvoked = new CountDownLatch(1);
    final Thread.UncaughtExceptionHandler ueh =
        new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                threadAssertTrue(e instanceof MyError);
                threadAssertTrue(t instanceof FailingFJWSubclass);
                uehInvoked.countDown();
            }};
    ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
                                      ueh, false);
    try (PoolCleaner cleaner = cleaner(p)) {
        assertSame(ueh, p.getUncaughtExceptionHandler());
        try {
            p.execute(new FibTask(8));
            await(uehInvoked);
        } finally {
            p.shutdownNow(); // failure might have prevented processing task
        }
    }
}
 
@Test
void pool_publish_pushCollection() {
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<3;i++){
        integers.add(i);
    }
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        ObservablePublish<Integer> integerObservable = (ObservablePublish<Integer>) pushCollectionDANGER(integers, forkJoinPool).publish();
        integerObservable.subscribe(x -> {
            log("一郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        integerObservable.subscribe(x -> {
            log("二郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        integerObservable.connect();
        sleep(2,TimeUnit.SECONDS);
        //此处在我们解读API的时候有提到过,接收一个Disposable对象,并实现其消费动作
        integerObservable.connect(ps -> ps.dispose());
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 15;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
@Test
void infinite_refCount_publish_test() {
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    try {
        Observable<Integer> integerObservable =  pushCollectionDANGER(integers, forkJoinPool).publish().refCount();
        integerObservable.subscribe(x -> {
            log("一郎神: " + x);
            sleep(2,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        sleep(1,TimeUnit.SECONDS);
        integerObservable.subscribe(x -> {
            log("二郎神: " + x);
            sleep(1,TimeUnit.SECONDS);
        },Throwable::printStackTrace, () -> System.out.println("Emission completed"));
        sleep(20,TimeUnit.SECONDS);
    } finally {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码8 项目: openjdk-jdk9   文件: ForkJoinPoolTest.java
/**
 * Pool maintains parallelism when using ManagedBlocker
 */
public void testBlockingForkJoinTask() throws Throwable {
    ForkJoinPool p = new ForkJoinPool(4);
    try {
        ReentrantLock lock = new ReentrantLock();
        ManagedLocker locker = new ManagedLocker(lock);
        ForkJoinTask<Integer> f = new LockingFibTask(20, locker, lock);
        p.execute(f);
        assertEquals(6765, (int) f.get());
    } finally {
        p.shutdownNow(); // don't wait out shutdown
    }
}
 
源代码9 项目: j2objc   文件: ForkJoinPoolTest.java
/**
 * Pool maintains parallelism when using ManagedBlocker
 */
public void testBlockingForkJoinTask() throws Throwable {
    ForkJoinPool p = new ForkJoinPool(4);
    try {
        ReentrantLock lock = new ReentrantLock();
        ManagedLocker locker = new ManagedLocker(lock);
        ForkJoinTask<Integer> f = new LockingFibTask(20, locker, lock);
        p.execute(f);
        assertEquals(6765, (int) f.get());
    } finally {
        p.shutdownNow(); // don't wait out shutdown
    }
}
 
源代码10 项目: kylin-on-parquet-v2   文件: DoggedCubeBuilder2.java
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
        throws IOException {
    final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
            .getQueueController(inputConverterUnit, input);

    final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();

    ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
        @Override
        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
            return worker;
        }
    };

    ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
    CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);

    Stopwatch sw = new Stopwatch();
    sw.start();
    logger.info("Dogged Cube Build2 start");
    try {
        BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
        builderPool.execute(task);
        do {
            builderList.add(task.getInternalBuilder());
            //Exception will be thrown here if cube building failure
            task.join();
            task = task.nextTask();
        } while (task != null);

        logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
        for (final InMemCubeBuilder2 builder : builderList) {
            builderPool.submit(new Runnable() {
                @Override
                public void run() {
                    builder.startBuildFromBaseCuboid();
                }
            });
        }
        resultWatcher.start();
        logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
    } catch (Throwable e) {
        logger.error("Dogged Cube Build2 error", e);
        if (e instanceof Error)
            throw (Error) e;
        else if (e instanceof RuntimeException)
            throw (RuntimeException) e;
        else
            throw new IOException(e);
    } finally {
        output.close();
        closeGirdTables(builderList);
        sw.stop();
        builderPool.shutdownNow();
        logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
        logger.info("Dogged Cube Build2 return");
    }
}
 
@Test
void replay_PublishSubject_test() {
    PublishSubject<Object> publishSubject = PublishSubject.create();
    ConnectableObservable<Object> replay = publishSubject.replay();
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    Disposable subscribe1 = replay.subscribe(x -> {
        log("一郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

    Disposable subscribe2 = replay.subscribe(x -> {
        log("二郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    Disposable subscribe3 = replay.subscribe(x -> {
        log("三郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    AtomicInteger atomicInteger = new AtomicInteger(integers.size());
    try {
        forkJoinPool.submit(() -> {
            integers.forEach(id -> {
                sleep(1,TimeUnit.SECONDS);
                publishSubject.onNext(id);
                if (atomicInteger.decrementAndGet() == 0) {
                    publishSubject.onComplete();
                }
            });
        });
       /* integers.forEach(id -> forkJoinPool.submit(() -> {
            sleep(3,TimeUnit.SECONDS);
            publishSubject.onNext(id);
            if (atomicInteger.decrementAndGet() == 0) {
                publishSubject.onComplete();
            }
        }));*/
        replay.connect();
        sleep(2,TimeUnit.SECONDS);
        subscribe1.dispose();
        sleep(1,TimeUnit.SECONDS);
        //replay.connect(consumer -> consumer.dispose());
        publishSubject.onComplete();
    } finally  {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码12 项目: kylin   文件: DoggedCubeBuilder2.java
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
        throws IOException {
    final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
            .getQueueController(inputConverterUnit, input);

    final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();

    ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
        @Override
        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
            worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
            return worker;
        }
    };

    ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
    CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);

    Stopwatch sw = Stopwatch.createUnstarted();
    sw.start();
    logger.info("Dogged Cube Build2 start");
    try {
        BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
        builderPool.execute(task);
        do {
            builderList.add(task.getInternalBuilder());
            //Exception will be thrown here if cube building failure
            task.join();
            task = task.nextTask();
        } while (task != null);

        logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
        for (final InMemCubeBuilder2 builder : builderList) {
            builderPool.submit(new Runnable() {
                @Override
                public void run() {
                    builder.startBuildFromBaseCuboid();
                }
            });
        }
        resultWatcher.start();
        logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsed(MILLISECONDS) + " ms");
    } catch (Throwable e) {
        logger.error("Dogged Cube Build2 error", e);
        if (e instanceof Error)
            throw (Error) e;
        else if (e instanceof RuntimeException)
            throw (RuntimeException) e;
        else
            throw new IOException(e);
    } finally {
        output.close();
        closeGirdTables(builderList);
        sw.stop();
        builderPool.shutdownNow();
        logger.info("Dogged Cube Build2 end, totally took " + sw.elapsed(MILLISECONDS) + " ms");
        logger.info("Dogged Cube Build2 return");
    }
}