下面列出了怎么用java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory的API类实例代码及写法,或者点击链接到github查看源代码。
ExecutorService getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
return new ForkJoinPool(asyncThreads,
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("server-worker-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
ExecutorService getExecutor(int asyncThreads) {
// TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be
// put. Move it somewhere else, or remove it if no longer necessary.
// See: https://github.com/grpc/grpc-java/issues/2119
return new ForkJoinPool(asyncThreads,
new ForkJoinWorkerThreadFactory() {
final AtomicInteger num = new AtomicInteger();
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("server-worker-" + "-" + num.getAndIncrement());
return thread;
}
}, UncaughtExceptionHandlers.systemExit(), true /* async */);
}
/**
* Build all the cuboids and wait for all the tasks finished.
*
* @param input
* @param listener
* @return
* @throws IOException
*/
private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
final ICuboidResultListener listener) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 start, {}", cubeDesc.getName());
// build base cuboid
buildBaseCuboid(input, listener);
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
ForkJoinTask rootTask = builderPool.submit(new Runnable() {
@Override
public void run() {
startBuildFromBaseCuboid();
}
});
rootTask.join();
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 end, {}, takes {} ms", cubeDesc.getName(), (endTime - startTime));
logger.info("total CuboidResult count: {}", resultCollector.getAllResult().size());
return resultCollector.getAllResult();
}
/**
* Build all the cuboids and wait for all the tasks finished.
*
* @param input
* @param listener
* @return
* @throws IOException
*/
private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final RecordConsumeBlockingQueueController<T> input,
final ICuboidResultListener listener) throws IOException {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 start, {}", cubeDesc.getName());
// build base cuboid
buildBaseCuboid(input, listener);
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("inmem-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
ForkJoinTask rootTask = builderPool.submit(new Runnable() {
@Override
public void run() {
startBuildFromBaseCuboid();
}
});
rootTask.join();
long endTime = System.currentTimeMillis();
logger.info("In Mem Cube Build2 end, {}, takes {} ms", cubeDesc.getName(), (endTime - startTime));
logger.info("total CuboidResult count: {}", resultCollector.getAllResult().size());
return resultCollector.getAllResult();
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = new Stopwatch();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
logger.info("Dogged Cube Build2 return");
}
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = Stopwatch.createUnstarted();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsed(MILLISECONDS) + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsed(MILLISECONDS) + " ms");
logger.info("Dogged Cube Build2 return");
}
}