下面列出了怎么用java.util.concurrent.ForkJoinWorkerThread的API类实例代码及写法,或者点击链接到github查看源代码。
ExecutorService getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
return new ForkJoinPool(asyncThreads,
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("server-worker-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
ExecutorService getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
return new ForkJoinPool(asyncThreads,
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("server-worker-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
/**
* Construct a ForkJoinPool with a stricter thread limit.
*
* <p>ForkJoinPool by default will create a new thread to handle pending work whenever an existing
* thread becomes blocked on a task and cannot work steal. In cases when many tasks would block on
* a slow running dependency, it can trigger thread creation for all those tasks.
*
* <p>Note that limiting the maximum threads will impact the ability for ManagedBlockers to cause
* the pool to create new worker threads, leading to potential deadlock if many ManagedBlockers
* are used.
*/
public static ForkJoinPool forkJoinPoolWithThreadLimit(int parallelism, int spares) {
AtomicInteger activeThreads = new AtomicInteger(0);
return new ForkJoinPool(
parallelism,
pool -> {
if (activeThreads.get() > parallelism + spares) {
return null;
}
return new ForkJoinWorkerThread(pool) {
@Override
protected void onStart() {
super.onStart();
activeThreads.incrementAndGet();
}
@Override
protected void onTermination(Throwable exception) {
activeThreads.decrementAndGet();
super.onTermination(exception);
}
};
},
/* handler */ null,
/* asyncMode */ false);
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("MediaLibraryScanner-" + worker.getPoolIndex());
worker.setPriority(Thread.MIN_PRIORITY);
return worker;
}
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
public static int getLeafTarget() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
}
else {
return LEAF_TARGET;
}
}
/**
* Build all the cuboids and wait for all the tasks finished.
*
* @param input
* @param listener
* @return
* @throws IOException
*/
private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
final ICuboidResultListener listener) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 start, {}", cubeDesc.getName());
// build base cuboid
buildBaseCuboid(input, listener);
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
ForkJoinTask rootTask = builderPool.submit(new Runnable() {
@Override
public void run() {
startBuildFromBaseCuboid();
}
});
rootTask.join();
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 end, {}, takes {} ms", cubeDesc.getName(), (endTime - startTime));
logger.info("total CuboidResult count: {}", resultCollector.getAllResult().size());
return resultCollector.getAllResult();
}
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
public static int getLeafTarget() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
}
else {
return LEAF_TARGET;
}
}
private static ForkJoinPool createPool() {
AtomicInteger threadNumber = new AtomicInteger();
return new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
pool -> {
var thread = new ForkJoinWorkerThread(pool) {};
thread.setDaemon(true);
thread.setPriority((Thread.NORM_PRIORITY + Thread.MIN_PRIORITY) / 2);
thread.setName("async-packet-provider-thread-" + (threadNumber.incrementAndGet()));
return thread;
},
null,
true
);
}
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
public static int getLeafTarget() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
}
else {
return LEAF_TARGET;
}
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("Bot-" + worker.getPoolIndex());
return worker;
}
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
public static int getLeafTarget() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
}
else {
return LEAF_TARGET;
}
}
public Tasks() {
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
executorService = new ForkJoinPool(Math.max(4, Runtime.getRuntime().availableProcessors() * 8),
p -> new ForkJoinWorkerThread(p) {
{
setContextClassLoader(loader);
}
}, (r, executor) -> errors.add(new IllegalStateException("Task rejected: " + r)), false);
}
/**
* pollSubmission returns unexecuted submitted task, if present
*/
public void testPollSubmission() {
final CountDownLatch done = new CountDownLatch(1);
final ForkJoinTask a = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask b = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask c = ForkJoinTask.adapt(awaiter(done));
final ForkJoinPool p = singletonPool();
try (PoolCleaner cleaner = cleaner(p, done)) {
Thread external = new Thread(new CheckedRunnable() {
public void realRun() {
p.execute(a);
p.execute(b);
p.execute(c);
}});
RecursiveAction s = new CheckedRecursiveAction() {
protected void realCompute() {
external.start();
try {
external.join();
} catch (Exception ex) {
threadUnexpectedException(ex);
}
assertTrue(p.hasQueuedSubmissions());
assertTrue(Thread.currentThread() instanceof ForkJoinWorkerThread);
ForkJoinTask r = ForkJoinTask.pollSubmission();
assertTrue(r == a || r == b || r == c);
assertFalse(r.isDone());
}};
p.invoke(s);
}
}
/**
* getPool of current thread in pool returns its pool
*/
public void testWorkerGetPool() {
final ForkJoinPool mainPool = mainPool();
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread) Thread.currentThread();
assertSame(mainPool, w.getPool());
}};
testInvokeOnPool(mainPool, a);
}
/**
* getPoolIndex of current thread in pool returns 0 <= value < poolSize
*/
public void testWorkerGetPoolIndex() {
final ForkJoinPool mainPool = mainPool();
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread) Thread.currentThread();
assertTrue(w.getPoolIndex() >= 0);
// pool size can shrink after assigning index, so cannot check
// assertTrue(w.getPoolIndex() < mainPool.getPoolSize());
}};
testInvokeOnPool(mainPool, a);
}
/**
* Default target of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
public static int getLeafTarget() {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
}
else {
return LEAF_TARGET;
}
}
@Test
public void prefixIsAddedToThread() {
nexusForkJoinWorkerThreadFactory = new NexusForkJoinWorkerThreadFactory("prefix-test");
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinWorkerThread thread = nexusForkJoinWorkerThreadFactory.newThread(forkJoinPool);
assertThat(thread.getName(), CoreMatchers.containsString("prefix-test"));
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// Note: The ForkJoinPool will create these threads as daemon threads.
ForkJoinWorkerThread thread = new InternalForkJoinWorkerThread(pool);
thread.setName(namePrefix + '-' + idx.getAndIncrement());
return thread;
}
private void checkThread(Object element) {
Thread thread = Thread.currentThread();
if (!(thread instanceof ForkJoinWorkerThread))
throw new IllegalStateException("Not inside FJP (element: " + element + ")");
if (((ForkJoinWorkerThread) thread).getPool() != pool)
throw new IllegalStateException("FJP is incorrect (element: " + element + ")");
}
/**
* Build all the cuboids and wait for all the tasks finished.
*
* @param input
* @param listener
* @return
* @throws IOException
*/
private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
final ICuboidResultListener listener) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 start, {}", cubeDesc.getName());
// build base cuboid
buildBaseCuboid(input, listener);
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
ForkJoinTask rootTask = builderPool.submit(new Runnable() {
@Override
public void run() {
startBuildFromBaseCuboid();
}
});
rootTask.join();
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 end, {}, takes {} ms", cubeDesc.getName(), (endTime - startTime));
logger.info("total CuboidResult count: {}", resultCollector.getAllResult().size());
return resultCollector.getAllResult();
}
/**
* pollSubmission returns unexecuted submitted task, if present
*/
public void testPollSubmission() {
final CountDownLatch done = new CountDownLatch(1);
final ForkJoinTask a = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask b = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask c = ForkJoinTask.adapt(awaiter(done));
final ForkJoinPool p = singletonPool();
try (PoolCleaner cleaner = cleaner(p, done)) {
Thread external = new Thread(new CheckedRunnable() {
public void realRun() {
p.execute(a);
p.execute(b);
p.execute(c);
}});
RecursiveAction s = new CheckedRecursiveAction() {
protected void realCompute() {
external.start();
try {
external.join();
} catch (Exception ex) {
threadUnexpectedException(ex);
}
assertTrue(p.hasQueuedSubmissions());
assertTrue(Thread.currentThread() instanceof ForkJoinWorkerThread);
ForkJoinTask r = ForkJoinTask.pollSubmission();
assertTrue(r == a || r == b || r == c);
assertFalse(r.isDone());
}};
p.invoke(s);
}
}
/**
* getPool of current thread in pool returns its pool
*/
public void testWorkerGetPool() {
final ForkJoinPool mainPool = mainPool();
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread) Thread.currentThread();
assertSame(mainPool, w.getPool());
}};
testInvokeOnPool(mainPool, a);
}
/**
* getPoolIndex of current thread in pool returns 0 <= value < poolSize
*/
public void testWorkerGetPoolIndex() {
final ForkJoinPool mainPool = mainPool();
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
ForkJoinWorkerThread w =
(ForkJoinWorkerThread) Thread.currentThread();
assertTrue(w.getPoolIndex() >= 0);
// pool size can shrink after assigning index, so cannot check
// assertTrue(w.getPoolIndex() < mainPool.getPoolSize());
}};
testInvokeOnPool(mainPool, a);
}
@Override public <T> ABenchmarkFuture<T> submit (Callable<T> code) {
if (Thread.currentThread () instanceof ForkJoinWorkerThread) {
final ForkJoinTask<T> task = ForkJoinTask.adapt (code);
task.fork ();
return new WrappingAFuture<> (task);
}
return new WrappingAFuture<> (ec.submit (code));
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final int n = setNextBit();
ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) {
@Override
protected void onTermination(Throwable exception) {
clearBit(n);
super.onTermination(exception);
}
};
thread.setName("JobScheduler FJ pool " + n + "/" + PARALLELISM);
thread.setPriority(Thread.NORM_PRIORITY - 1);
return thread;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new SafeForkJoinWorkerThread(pool);
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
return new ContextAwareThread(pool, runContextData);
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new TheaterWorker(pool, this.theater, WORKER_COUNT.getAndIncrement(this));
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return null;
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = new Stopwatch();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
logger.info("Dogged Cube Build2 return");
}
}