下面列出了怎么用java.util.concurrent.ForkJoinPool的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Call the metrics API on each host and aggregate the metrics
* into a single value, grouped by cluster.
*/
public Map<ClusterInfo, MetricsAggregator> requestMetricsGroupedByCluster(Collection<URI> hosts) {
Map<ClusterInfo, MetricsAggregator> clusterMetricsMap = new ConcurrentHashMap<>();
long startTime = System.currentTimeMillis();
Runnable retrieveMetricsJob = () ->
hosts.parallelStream().forEach(host ->
getHostMetrics(host, clusterMetricsMap)
);
ForkJoinPool threadPool = new ForkJoinPool(10);
threadPool.submit(retrieveMetricsJob);
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.log(Level.FINE, () ->
String.format("Metric retrieval for %d nodes took %d milliseconds", hosts.size(), System.currentTimeMillis() - startTime)
);
return clusterMetricsMap;
}
@Override
public void test_result() throws Exception {
super.test_result();
IRunnableFuture2<Result<Object, RuntimeException>> future = IRunnableFuture2.toResultFuture(() -> {
throw new RuntimeException("xxx2");
});
submitToExecutor(future, ForkJoinPool.commonPool());
Result<Object, RuntimeException> result = future.awaitResult();
verifyThrows(() -> {
result.get();
}, RuntimeException.class, "xxx2");
}
@Test
public void testWritesCorrectNumber() {
ExecutorService executor = ForkJoinPool.commonPool();
AtomicLong numSent = new AtomicLong(0);
long expectedSent = 100;
SendToKafka sender = new SendToKafka(null, expectedSent, 10, () -> "msg", executor, numSent, ThreadLocal.withInitial(() -> null) ) {
@Override
protected Future<?> sendToKafka(KafkaProducer producer, String kafkaTopic, String message) {
assertEquals(message, "msg");
return ForkJoinPool.commonPool().submit(() -> {
numSent.incrementAndGet();
});
}
};
sender.run();
assertEquals(numSent.get(), expectedSent);
}
@Test
public void canWriteLogs() {
final ListAppender appender = logs.getListAppender("STDOUT");
// emit warning with exception from an async CompletableFuture
CompletableFuture.runAsync(() -> LoggerFactory.getLogger(LoggingTest.class).warn("msg",
new UncheckedIOException(new IOException("test"))), ForkJoinPool.commonPool()).join();
assertThat(appender.getMessages(), containsInAnyOrder(allOf(containsString("LoggingTest"),
containsString("\"level\":\"WARN\""), containsString("\"message\":\"msg\""), containsString(
"\"extendedStackTrace\":\"java.io.UncheckedIOException: java.io.IOException: test"))));
// emit error without exception from an async CompletableFuture
appender.clear();
LoggerFactory.getLogger(LoggingTest.class).error("test message");
assertThat(appender.getMessages(),
containsInAnyOrder(allOf(containsString("LoggingTest"),
containsString("\"level\":\"ERROR\""), containsString("\"message\":\"test message\""),
not(containsString("extendedStackTrace")))));
}
@Test
public void testActivate() {
Map<String, Object> props = new HashMap<>();
props.put("poolSize", 0);
manager.modified(props);
ForkJoinPool pool = Whitebox.getInternalState(manager, "threadPool");
assertTrue(pool.getParallelism() > 0);
props.put("poolSize", -1);
manager.modified(props);
pool = Whitebox.getInternalState(manager, "threadPool");
assertEquals(pool.getParallelism(), 1);
props.put("poolSize", 1);
manager.modified(props);
pool = Whitebox.getInternalState(manager, "threadPool");
assertEquals(pool.getParallelism(), 1);
props.put("poolSize", 2);
manager.modified(props);
pool = Whitebox.getInternalState(manager, "threadPool");
assertEquals(pool.getParallelism(), 2);
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
// Create pool of 100 threads to compute results
ForkJoinPool fjp = new ForkJoinPool(100);
try {
// Obtain list of lines
List<CompletableFuture<String>> list =
Files.lines(path)
// Map lines into a future task
.map(line -> CompletableFuture.supplyAsync(
() -> addressName(line), fjp))
// Collect future tasks into a list
.collect(Collectors.toList());
// Wait for tasks to complete, and print the result
list.stream().map(CompletableFuture::join)
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}
@Override
public AbstractFeature build() throws JATEException {
ChiSquareFrequentTerms feature = new ChiSquareFrequentTerms();
int cores = properties.getMaxCPUCores();
cores = cores == 0 ? 1 : cores;
int maxPerThread = getMaxPerThread(cores);
LOG.info("Beginning building features (ChiSquare frequent terms). Total terms=" + allFrequentTerms.size() + ", cpu cores=" +
cores + ", max per core=" + maxPerThread);
ChiSquareFrequentTermsFBWorker worker = new
ChiSquareFrequentTermsFBWorker(allFrequentTerms, maxPerThread, ctx2TTF, term2Ctx,
feature, ttfInCorpus);
ForkJoinPool forkJoinPool = new ForkJoinPool(cores);
int total = forkJoinPool.invoke(worker);
StringBuilder sb = new StringBuilder("Complete building features. Total processed terms = " + total);
LOG.info(sb.toString());
return feature;
}
@Override
public void optimize() {
final ForkJoinPool pool = TaskManager.IMP.getPublicForkJoinPool();
map.forEachChunk(new RunnableVal<FaweChunk>() {
@Override
public void run(final FaweChunk chunk) {
pool.submit(new Runnable() {
@Override
public void run() {
chunk.optimize();
}
});
}
});
pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
private static void demo4_Flow_submissionPublisher() {
System.out.println();
ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){
demoSubscribe(publisher, execService, "One");
demoSubscribe(publisher, execService, "Two");
demoSubscribe(publisher, execService, "Three");
IntStream.range(1, 5).forEach(publisher::submit);
} finally {
try {
execService.shutdown();
int shutdownDelaySec = 1;
System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service...");
execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
} finally {
System.out.println("Calling execService.shutdownNow()...");
List<Runnable> l = execService.shutdownNow();
System.out.println(l.size() + " tasks were waiting to be executed. Service stopped.");
}
}
}
@Test
public void testInvalidPlatformName() {
Map<String, String> skipperDeployerProperties = new HashMap<>();
skipperDeployerProperties.put(SkipperStream.SKIPPER_PACKAGE_NAME, "package1");
skipperDeployerProperties.put(SkipperStream.SKIPPER_PACKAGE_VERSION, "1.0.1");
skipperDeployerProperties.put(SkipperStream.SKIPPER_PLATFORM_NAME, "badPlatform");
skipperDeployerProperties.put(SkipperStream.SKIPPER_REPO_NAME, "mylocal-repo1");
StreamDeploymentRequest streamDeploymentRequest = new StreamDeploymentRequest("test1", "time | log",
new ArrayList<>(),
skipperDeployerProperties);
SkipperClient skipperClient = MockUtils.createSkipperClientMock();
SkipperStreamDeployer skipperStreamDeployer = new SkipperStreamDeployer(skipperClient,
mock(StreamDefinitionRepository.class), mock(AppRegistryService.class), mock(ForkJoinPool.class), new DefaultStreamDefinitionService());
try {
skipperStreamDeployer.deployStream(streamDeploymentRequest);
fail();
}
catch (IllegalArgumentException expected) {
assertThat(expected).hasMessage("No platform named 'badPlatform'");
}
}
@Override
public AbstractFeature build() throws JATEException {
Containment feature = new Containment();
//start workers
int cores = properties.getMaxCPUCores();
cores = cores == 0 ? 1 : cores;
int maxPerThread = getMaxPerThread(cores);
StringBuilder sb = new StringBuilder("Building features using cpu cores=");
sb.append(cores).append(", total terms=").append(uniqueCandidateTerms.size()).append(", max per worker=")
.append(maxPerThread);
LOG.info(sb.toString());
ContainmentFBWorker worker = new
ContainmentFBWorker(new ArrayList<>(uniqueCandidateTerms), maxPerThread,
feature,
termComponentIndex);
ForkJoinPool forkJoinPool = new ForkJoinPool(cores);
int[] total = forkJoinPool.invoke(worker);
sb = new StringBuilder("Complete building features. Total=");
sb.append(total[1]).append(" success=").append(total[0]);
LOG.info(sb.toString());
return feature;
}
/**
* Returns a result that, after the given blocking function executes asynchronously on
* {@link ForkJoinPool#commonPool()} and returns a result, completes when the returned result completes, with the same
* value or exception.
*
* @param fn The function returning a result.
* @param <T> The type of the returned result's value.
* @return A new result.
*/
static <T> AsyncResult<T> executeBlocking(Supplier<T> fn) {
requireNonNull(fn);
CompletableAsyncResult<T> asyncResult = AsyncResult.incomplete();
ForkJoinPool.commonPool().execute(() -> {
try {
asyncResult.complete(fn.get());
} catch (Throwable ex) {
asyncResult.completeExceptionally(ex);
}
});
return asyncResult;
}
/**
* create a Session and send a SQL to the database.
*/
@Test
public void sqlOperation() {
DataSource ds = ConnectUtil.openDb(postgres);
Session session = ds.getSession(t -> fail("ERROR: " + t.getMessage()));
try (session) {
assertNotNull(session);
session.operation(TRIVIAL).submit();
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.MINUTES);
}
public static void main(String[] args) {
System.out.println("Parallelism: " + ForkJoinPool.getCommonPoolParallelism());
testForEach();
testSearch();
testReduce();
}
/**
* pollSubmission returns unexecuted submitted task, if present
*/
public void testPollSubmission() {
final CountDownLatch done = new CountDownLatch(1);
final ForkJoinTask a = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask b = ForkJoinTask.adapt(awaiter(done));
final ForkJoinTask c = ForkJoinTask.adapt(awaiter(done));
final ForkJoinPool p = singletonPool();
try (PoolCleaner cleaner = cleaner(p, done)) {
Thread external = new Thread(new CheckedRunnable() {
public void realRun() {
p.execute(a);
p.execute(b);
p.execute(c);
}});
RecursiveAction s = new CheckedRecursiveAction() {
protected void realCompute() {
external.start();
try {
external.join();
} catch (Exception ex) {
threadUnexpectedException(ex);
}
assertTrue(p.hasQueuedSubmissions());
assertTrue(Thread.currentThread() instanceof ForkJoinWorkerThread);
ForkJoinTask r = ForkJoinTask.pollSubmission();
assertTrue(r == a || r == b || r == c);
assertFalse(r.isDone());
}};
p.invoke(s);
}
}
/**
* Common pool cannot be shut down
*/
public void testCommonPoolShutDown() {
assertFalse(ForkJoinPool.commonPool().isShutdown());
assertFalse(ForkJoinPool.commonPool().isTerminating());
assertFalse(ForkJoinPool.commonPool().isTerminated());
ForkJoinPool.commonPool().shutdown();
assertFalse(ForkJoinPool.commonPool().isShutdown());
assertFalse(ForkJoinPool.commonPool().isTerminating());
assertFalse(ForkJoinPool.commonPool().isTerminated());
ForkJoinPool.commonPool().shutdownNow();
assertFalse(ForkJoinPool.commonPool().isShutdown());
assertFalse(ForkJoinPool.commonPool().isTerminating());
assertFalse(ForkJoinPool.commonPool().isTerminated());
}
/** Root task constructor */
public IntCumulateTask(IntCumulateTask parent,
IntBinaryOperator function,
int[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
public static void main(String[] args) {
AppleTree[] appleTrees = AppleTree.newTreeGarden(12);
ForkJoinPool pool = ForkJoinPool.commonPool();
PickFruitTask task = new PickFruitTask(appleTrees, 0, appleTrees.length - 1);
int result = pool.invoke(task);
System.out.println();
System.out.println("Total apples picked: " + result);
}
/** Root task constructor */
public IntCumulateTask(IntCumulateTask parent,
IntBinaryOperator function,
int[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
private static void testContextClassLoaderIsNull() throws Exception {
ForkJoinPool pool = new ForkJoinPool(4);
pool.submit(() ->
IntStream.range(0, 20).parallel().forEach(item -> {
if (Thread.currentThread().getName().equals("main")) {
return;
}
// in web environment, this could be null, here we just mock a null class loader.
Thread.currentThread().setContextClassLoader(null);
TestMgr.check(null, test.postTestStatic(2));
})).get();
}
public static void main(String[] args) {
PrintTask task = new PrintTask(0, 25);
// 分配四个线程给它
ForkJoinPool pool = new ForkJoinPool(4);
pool.execute(task);
pool.shutdown();
}
@Test
public void callCommonPool_whenExistsAndExpectedType_thenCorrect() {
ForkJoinPool commonPool = ForkJoinPool.commonPool();
ForkJoinPool commonPoolTwo = ForkJoinPool.commonPool();
assertNotNull(commonPool);
assertEquals(commonPool, commonPoolTwo);
}
/**
* timed invokeAny(empty collection) throws IllegalArgumentException
*/
public void testTimedInvokeAny2() throws Throwable {
ExecutorService e = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(e)) {
try {
e.invokeAny(new ArrayList<Callable<String>>(),
MEDIUM_DELAY_MS, MILLISECONDS);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
}
/**
* Builds a base configuration
*
* @param applicationID The Algolia Application ID
* @param apiKey The API Key: could be Admin API Key or Search API Key
* @throws NullPointerException If the ApplicationID or the APIKey or the hosts are null
* @throws IllegalArgumentException If the ApplicationID or the APIKey are empty
*/
public Builder(
@Nonnull String applicationID,
@Nonnull String apiKey,
@Nonnull List<StatefulHost> defaultHosts,
@Nonnull CompressionType compressionType) {
this.applicationID = applicationID;
this.apiKey = apiKey;
this.useSystemProxy = false;
this.batchSize = 1000;
this.hosts = defaultHosts;
this.connectTimeOut = Defaults.CONNECT_TIMEOUT_MS;
this.compressionType = compressionType;
this.defaultHeaders = new HashMap<>();
this.defaultHeaders.put(Defaults.ALGOLIA_APPLICATION_HEADER, applicationID);
this.defaultHeaders.put(Defaults.ALGOLIA_KEY_HEADER, apiKey);
String clientVersion = this.getClass().getPackage().getImplementationVersion();
this.defaultHeaders.put(
Defaults.USER_AGENT_HEADER,
String.format("Algolia for Java (%s); JVM (%s)", clientVersion, JAVA_VERSION));
this.defaultHeaders.put(Defaults.ACCEPT_HEADER, Defaults.APPLICATION_JSON);
this.defaultHeaders.put(Defaults.ACCEPT_ENCODING_HEADER, Defaults.CONTENT_ENCODING_GZIP);
this.executor = ForkJoinPool.commonPool();
}
/** Root task constructor */
public CumulateTask(CumulateTask<T> parent,
BinaryOperator<T> function,
T[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/** Root task constructor */
public LongCumulateTask(LongCumulateTask parent,
LongBinaryOperator function,
long[] array, int lo, int hi) {
super(parent);
this.function = function; this.array = array;
this.lo = this.origin = lo; this.hi = this.fence = hi;
int p;
this.threshold =
(p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
<= MIN_PARTITION ? MIN_PARTITION : p;
}
/**
* Null-checks user executor argument, and translates uses of
* commonPool to asyncPool in case parallelism disabled.
*/
static Executor screenExecutor(Executor e) {
if (!useCommonPool && e == ForkJoinPool.commonPool())
return asyncPool;
if (e == null) throw new NullPointerException();
return e;
}
public void testInvoke(ForkJoinPool pool) {
RecursiveAction a = new CheckedRecursiveAction() {
protected void realCompute() {
AsyncFib f = new AsyncFib(8);
assertNull(f.invoke());
f.checkCompletedNormally();
}};
testInvokeOnPool(pool, a);
}
@Test
public void classicEmptyBackpressured() throws Exception {
AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Flux.<Integer>empty().subscribeOn(Schedulers.fromExecutorService(ForkJoinPool.commonPool())).subscribe(ts);
ts.await(Duration.ofSeconds(5));
ts.assertNoValues()
.assertNoError()
.assertComplete();
}
/**
* A task submitted after shutdown is rejected
*/
public void testSubmitAfterShutdown() {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
p.shutdown();
assertTrue(p.isShutdown());
try {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
shouldThrow();
} catch (RejectedExecutionException success) {}
}
}