下面列出了java.util.concurrent.ForkJoinPool#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void handle(Message message) {
SpanContext spanCtx = getTracer().extract(Format.Builtin.TEXT_MAP,
new TextMapExtractAdapter(message.getHeaders()));
// Top level, so create Tracer and root span
Span serverSpan = getTracer().buildSpan("Server")
.asChildOf(spanCtx)
.withTag(Constants.ZIPKIN_BIN_ANNOTATION_HTTP_URL, "http://localhost:8080/inbound?orderId=123&verbose=true")
.withTag("orderId", "1243343456455")
.start();
delay(500);
ForkJoinPool pool = new ForkJoinPool();
for (int i = 0; i < 5; i++) {
int pos = i;
pool.execute(() -> component(serverSpan, pos));
}
pool.awaitQuiescence(5, TimeUnit.SECONDS);
serverSpan.finish();
serverSpan.close();
}
@Override
public CloseableExecutor newExecutor(Target target, String name) {
final ForkJoinPool executor = new ForkJoinPool(
coreWorkers(target),
new DefaultForkJoinWorkerThreadFactory(name),
new DefaultUncaughtExceptionHandler(), true);
return new CloseableExecutor() {
@Override
public void execute(Runnable task) {
executor.execute(task);
}
@Override
public void shutdown() {
logger.warn("ForkJoinPoolExecutorFactory#{} shutdown.", executor);
executor.shutdownNow();
}
};
}
/**
* setUncaughtExceptionHandler changes handler for uncaught exceptions.
*
* Additionally tests: Overriding ForkJoinWorkerThread.onStart
* performs its defined action
*/
public void testSetUncaughtExceptionHandler() throws InterruptedException {
final CountDownLatch uehInvoked = new CountDownLatch(1);
final Thread.UncaughtExceptionHandler ueh =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
threadAssertTrue(e instanceof MyError);
threadAssertTrue(t instanceof FailingFJWSubclass);
uehInvoked.countDown();
}};
ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
ueh, false);
try (PoolCleaner cleaner = cleaner(p)) {
assertSame(ueh, p.getUncaughtExceptionHandler());
try {
p.execute(new FibTask(8));
await(uehInvoked);
} finally {
p.shutdownNow(); // failure might have prevented processing task
}
}
}
/**
* setUncaughtExceptionHandler changes handler for uncaught exceptions.
*
* Additionally tests: Overriding ForkJoinWorkerThread.onStart
* performs its defined action
*/
public void testSetUncaughtExceptionHandler() throws InterruptedException {
final CountDownLatch uehInvoked = new CountDownLatch(1);
final Thread.UncaughtExceptionHandler ueh =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
threadAssertTrue(e instanceof MyError);
threadAssertTrue(t instanceof FailingFJWSubclass);
uehInvoked.countDown();
}};
ForkJoinPool p = new ForkJoinPool(1, new FailingThreadFactory(),
ueh, false);
try (PoolCleaner cleaner = cleaner(p)) {
assertSame(ueh, p.getUncaughtExceptionHandler());
try {
p.execute(new FibTask(8));
await(uehInvoked);
} finally {
p.shutdownNow(); // failure might have prevented processing task
}
}
}
public void test_generateId_multithread() throws Exception {
final Map<String, Object> dataMap = new HashMap<String, Object>();
dataMap.put("url", "http://example.com/");
final List<String> list = new ArrayList<>();
for (int i = 100; i > 0; i--) {
list.add(String.valueOf(i));
}
dataMap.put("role", list);
dataMap.put("virtual_host", list);
final String result =
"f8240bbae62b99960056c3a382844836c547c2ec73e019491bb7bbb02d92d98e876c8204b67a59ca8123b82d20986516b7d451f68dd634b39004c0d36c0eeca4";
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
final AtomicInteger counter = new AtomicInteger(0);
final ForkJoinPool pool = new ForkJoinPool(10);
for (int i = 0; i < 1000; i++) {
pool.execute(() -> {
assertEquals(result, crawlingInfoHelper.generateId(dataMap));
counter.incrementAndGet();
});
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(1000, counter.get());
}
public static void main(String[] args) throws Exception {
boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
ForkJoinPool executor = new ForkJoinPool();
for (int i = 0; i < 50; i++) {
final String userId = "user" + i;
executor.execute(
new Runnable() {
@Override
public void run() {
client.greet(userId);
}
});
}
executor.awaitQuiescence(100, TimeUnit.SECONDS);
executor.shutdown();
client.printSummary();
client.shutdown();
}
public static void main(String[] args) throws Throwable {
final ForkJoinPool e = new ForkJoinPool(1);
final AtomicBoolean b = new AtomicBoolean();
final Runnable setFalse = () -> b.set(false);
for (int i = 0; i < 100000; i++) {
b.set(true);
e.execute(setFalse);
long st = System.nanoTime();
while (b.get()) {
if (System.nanoTime() - st >= TimeUnit.SECONDS.toNanos(10)) {
throw new RuntimeException("Submitted task failed to execute");
}
}
}
}
@Test
public void testLocalQueueFifo() throws InterruptedException {
pool = new ForkJoinPool (pool.getParallelism (), pool.getFactory (), pool.getUncaughtExceptionHandler (), true);
for (int i=0; i<100; i++) {
pool.execute (new ForkingTask (-2));
}
// Give the pool time to get plenty of work done - whatever remains after this interval is assumed never to happen
Thread.sleep (2000);
assertEquals (2* pool.getParallelism (), perThread.intValue ());
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
SimpleTask simpleTask = new SimpleTask();
pool.execute(simpleTask);
simpleTask.latch.await();
pool.shutdown();
pool.awaitTermination(10, SECONDS);
}
private static void testForkJoinPool(final Span parent) {
final ForkJoinPool forkJoinPool = new ForkJoinPool(2);
final ForkJoinRecursiveTask forkJoinRecursiveTask = new ForkJoinRecursiveTask(IntStream.range(1, 10).toArray());
try (final Scope scope = GlobalTracer.get().activateSpan(parent)) {
forkJoinPool.execute(forkJoinRecursiveTask);
}
final int result = forkJoinRecursiveTask.join();
if (result != 450)
throw new AssertionError("ERROR: wrong fork join result: " + result);
}
public static void main(String[] args) {
PrintTask task = new PrintTask(0, 25);
// 分配四个线程给它
ForkJoinPool pool = new ForkJoinPool(4);
pool.execute(task);
pool.shutdown();
}
private static void demo2_ForkJoin_execute_join() {
System.out.println();
AverageSpeed averageSpeed = createTask();
ForkJoinPool commonPool = ForkJoinPool.commonPool();
commonPool.execute(averageSpeed);
double result = averageSpeed.join();
System.out.println("result = " + result);
}
public static void main(String[] args) throws Throwable {
final ForkJoinPool e = new ForkJoinPool(1);
final AtomicBoolean b = new AtomicBoolean();
final Runnable setFalse = () -> b.set(false);
for (int i = 0; i < 100000; i++) {
b.set(true);
e.execute(setFalse);
long st = System.nanoTime();
while (b.get()) {
if (System.nanoTime() - st >= TimeUnit.SECONDS.toNanos(10)) {
throw new RuntimeException("Submitted task failed to execute");
}
}
}
}
/**
* Pool maintains parallelism when using ManagedBlocker
*/
public void testBlockingForkJoinTask() throws Throwable {
ForkJoinPool p = new ForkJoinPool(4);
try {
ReentrantLock lock = new ReentrantLock();
ManagedLocker locker = new ManagedLocker(lock);
ForkJoinTask<Integer> f = new LockingFibTask(20, locker, lock);
p.execute(f);
assertEquals(6765, (int) f.get());
} finally {
p.shutdownNow(); // don't wait out shutdown
}
}
@Test
public void executeRecursiveTask_whenExecuted_thenCorrect() {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();
assertTrue(customRecursiveTask.isDone());
forkJoinPool.submit(customRecursiveTask);
int resultTwo = customRecursiveTask.join();
assertTrue(customRecursiveTask.isDone());
}
public static void main(String[] args) throws Throwable {
final ForkJoinPool e = new ForkJoinPool(1);
final AtomicBoolean b = new AtomicBoolean();
final Runnable setFalse = () -> b.set(false);
for (int i = 0; i < 100000; i++) {
b.set(true);
e.execute(setFalse);
long st = System.nanoTime();
while (b.get()) {
if (System.nanoTime() - st >= TimeUnit.SECONDS.toNanos(10)) {
throw new RuntimeException("Submitted task failed to execute");
}
}
}
}
public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output)
throws IOException {
final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController
.getQueueController(inputConverterUnit, input);
final List<InMemCubeBuilder2> builderList = new CopyOnWriteArrayList<>();
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("dogged-cubing-cuboid-worker-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, null, true);
CuboidResultWatcher resultWatcher = new CuboidResultWatcher(builderList, output);
Stopwatch sw = new Stopwatch();
sw.start();
logger.info("Dogged Cube Build2 start");
try {
BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 1, resultWatcher);
builderPool.execute(task);
do {
builderList.add(task.getInternalBuilder());
//Exception will be thrown here if cube building failure
task.join();
task = task.nextTask();
} while (task != null);
logger.info("Has finished feeding data, and base cuboid built, start to build child cuboids");
for (final InMemCubeBuilder2 builder : builderList) {
builderPool.submit(new Runnable() {
@Override
public void run() {
builder.startBuildFromBaseCuboid();
}
});
}
resultWatcher.start();
logger.info("Dogged Cube Build2 splits complete, took " + sw.elapsedMillis() + " ms");
} catch (Throwable e) {
logger.error("Dogged Cube Build2 error", e);
if (e instanceof Error)
throw (Error) e;
else if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new IOException(e);
} finally {
output.close();
closeGirdTables(builderList);
sw.stop();
builderPool.shutdownNow();
logger.info("Dogged Cube Build2 end, totally took " + sw.elapsedMillis() + " ms");
logger.info("Dogged Cube Build2 return");
}
}
/**
* awaitQuiescence by a worker is equivalent in effect to
* ForkJoinTask.helpQuiesce()
*/
public void testAwaitQuiescence1() throws Exception {
final ForkJoinPool p = new ForkJoinPool();
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
assertTrue(p.isQuiescent());
ForkJoinTask a = new CheckedRecursiveAction() {
protected void realCompute() {
FibAction f = new FibAction(8);
assertSame(f, f.fork());
assertSame(p, ForkJoinTask.getPool());
boolean quiescent = p.awaitQuiescence(LONG_DELAY_MS, MILLISECONDS);
assertTrue(quiescent);
assertFalse(p.isQuiescent());
while (!f.isDone()) {
assertFalse(p.getAsyncMode());
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
Thread.yield();
}
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertFalse(p.isQuiescent());
assertEquals(0, ForkJoinTask.getQueuedTaskCount());
assertEquals(21, f.result);
}};
p.execute(a);
while (!a.isDone() || !p.isQuiescent()) {
assertFalse(p.getAsyncMode());
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
Thread.yield();
}
assertEquals(0, p.getQueuedTaskCount());
assertFalse(p.getAsyncMode());
assertEquals(0, p.getQueuedSubmissionCount());
assertFalse(p.hasQueuedSubmissions());
while (p.getActiveThreadCount() != 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
}
}
/**
* awaitQuiescence returns when pool isQuiescent() or the indicated
* timeout elapsed
*/
public void testAwaitQuiescence2() throws Exception {
/**
* """It is possible to disable or limit the use of threads in the
* common pool by setting the parallelism property to zero. However
* doing so may cause unjoined tasks to never be executed."""
*/
if ("0".equals(System.getProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism")))
return;
final ForkJoinPool p = new ForkJoinPool();
try (PoolCleaner cleaner = cleaner(p)) {
assertTrue(p.isQuiescent());
final long startTime = System.nanoTime();
ForkJoinTask a = new CheckedRecursiveAction() {
protected void realCompute() {
FibAction f = new FibAction(8);
assertSame(f, f.fork());
while (!f.isDone()
&& millisElapsedSince(startTime) < LONG_DELAY_MS) {
assertFalse(p.getAsyncMode());
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
Thread.yield();
}
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, ForkJoinTask.getQueuedTaskCount());
assertEquals(21, f.result);
}};
p.execute(a);
assertTrue(p.awaitQuiescence(LONG_DELAY_MS, MILLISECONDS));
assertTrue(p.isQuiescent());
assertTrue(a.isDone());
assertEquals(0, p.getQueuedTaskCount());
assertFalse(p.getAsyncMode());
assertEquals(0, p.getQueuedSubmissionCount());
assertFalse(p.hasQueuedSubmissions());
while (p.getActiveThreadCount() != 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
String homePath = System.getProperty("user.home");
TreeNode root = new TreeNode(homePath);
pool.execute(new FileSizeTask(root));
}