下面列出了java.util.concurrent.ForkJoinPool#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void pool_pushCollection() {
List<Integer> integers = new ArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
pushCollection(integers, forkJoinPool).subscribe(x -> log(x));
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 1;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
/**
* setUncaughtExceptionHandler changes handler for uncaught exceptions.
*
* Additionally tests: Overriding ForkJoinWorkerThread.onStart
* performs its defined action
*/
public void testSetUncaughtExceptionHandler() throws InterruptedException {
final CountDownLatch uehInvoked = new CountDownLatch(1);
final Thread.UncaughtExceptionHandler ueh =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
threadAssertTrue(e instanceof MyError);
threadAssertTrue(t instanceof FailingFJWSubclass);
uehInvoked.countDown();
}};
ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
ueh, false);
try (PoolCleaner cleaner = cleaner(p)) {
assertSame(ueh, p.getUncaughtExceptionHandler());
try {
p.execute(new FibTask(8));
await(uehInvoked);
} finally {
p.shutdownNow(); // failure might have prevented processing task
}
}
}
@Override
public CloseableExecutor newExecutor(Target target, String name) {
final ForkJoinPool executor = new ForkJoinPool(
coreWorkers(target),
new DefaultForkJoinWorkerThreadFactory(name),
new DefaultUncaughtExceptionHandler(), true);
return new CloseableExecutor() {
@Override
public void execute(Runnable task) {
executor.execute(task);
}
@Override
public void shutdown() {
logger.warn("ForkJoinPoolExecutorFactory#{} shutdown.", executor);
executor.shutdownNow();
}
};
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadUtils.timerStart();
Stream<BigDecimal> primeStream = IntStream.range(40000, 50000)
// Stream<BigDecimal> primeStream = IntStream.range(10010000, 10020000)
.parallel()
.mapToObj(i->new BigDecimal(i))
.filter(CpuContention::isPrime)
;
ForkJoinPool pool = new ForkJoinPool(6);
Long n = pool.submit(() ->primeStream.count()).get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);
ThreadUtils.timerEndPrint();
System.out.println("Found: " +n);
}
/**
* setUncaughtExceptionHandler changes handler for uncaught exceptions.
*
* Additionally tests: Overriding ForkJoinWorkerThread.onStart
* performs its defined action
*/
public void testSetUncaughtExceptionHandler() throws InterruptedException {
final CountDownLatch uehInvoked = new CountDownLatch(1);
final Thread.UncaughtExceptionHandler ueh =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
threadAssertTrue(e instanceof MyError);
threadAssertTrue(t instanceof FailingFJWSubclass);
uehInvoked.countDown();
}};
ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
ueh, false);
try (PoolCleaner cleaner = cleaner(p)) {
assertSame(ueh, p.getUncaughtExceptionHandler());
try {
p.execute(new FibTask(8));
await(uehInvoked);
} finally {
p.shutdownNow(); // failure might have prevented processing task
}
}
}
@Test
void pool_publish_pushCollection() {
List<Integer> integers = new ArrayList<>();
for (int i=1;i<3;i++){
integers.add(i);
}
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
ObservablePublish<Integer> integerObservable = (ObservablePublish<Integer>) pushCollectionDANGER(integers, forkJoinPool).publish();
integerObservable.subscribe(x -> {
log("一郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.subscribe(x -> {
log("二郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
integerObservable.connect();
sleep(2,TimeUnit.SECONDS);
//此处在我们解读API的时候有提到过,接收一个Disposable对象,并实现其消费动作
integerObservable.connect(ps -> ps.dispose());
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 15;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
@Test
void infinite_refCount_publish_test() {
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
try {
Observable<Integer> integerObservable = pushCollectionDANGER(integers, forkJoinPool).publish().refCount();
integerObservable.subscribe(x -> {
log("一郎神: " + x);
sleep(2,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
sleep(1,TimeUnit.SECONDS);
integerObservable.subscribe(x -> {
log("二郎神: " + x);
sleep(1,TimeUnit.SECONDS);
},Throwable::printStackTrace, () -> System.out.println("Emission completed"));
sleep(20,TimeUnit.SECONDS);
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
/**
* Pool maintains parallelism when using ManagedBlocker
*/
public void testBlockingForkJoinTask() throws Throwable {
ForkJoinPool p = new ForkJoinPool(4);
try {
ReentrantLock lock = new ReentrantLock();
ManagedLocker locker = new ManagedLocker(lock);
ForkJoinTask<Integer> f = new LockingFibTask(20, locker, lock);
p.execute(f);
assertEquals(6765, (int) f.get());
} finally {
p.shutdownNow(); // don't wait out shutdown
}
}
/**
* Pool maintains parallelism when using ManagedBlocker
*/
public void testBlockingForkJoinTask() throws Throwable {
ForkJoinPool p = new ForkJoinPool(4);
try {
ReentrantLock lock = new ReentrantLock();
ManagedLocker locker = new ManagedLocker(lock);
ForkJoinTask<Integer> f = new LockingFibTask(20, locker, lock);
p.execute(f);
assertEquals(6765, (int) f.get());
} finally {
p.shutdownNow(); // don't wait out shutdown
}
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = new Stopwatch();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
logger.info("Dogged Cube Build2 return");
}
}
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
/* integers.forEach(id -> forkJoinPool.submit(() -> {
sleep(3,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
}));*/
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = Stopwatch.createUnstarted();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsed(MILLISECONDS) + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsed(MILLISECONDS) + " ms");
logger.info("Dogged Cube Build2 return");
}
}