下面列出了java.util.concurrent.ForkJoinPool#invoke ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static void test(ForkJoinPool g, int i) throws Exception {
boardSize = i;
int ps = g.getParallelism();
long start = System.nanoTime();
NQueensCS task = new NQueensCS(new int[0]);
g.invoke(task);
int solutions = task.solutions;
long time = System.nanoTime() - start;
double secs = (double) time / NPS;
if (solutions != expectedSolutions[i])
throw new Error();
System.out.printf("NQueensCS %3d", i);
System.out.printf(" Time: %7.3f", secs);
long sc = g.getStealCount();
long ns = sc - lastStealCount;
lastStealCount = sc;
System.out.printf(" Steals/t: %5d", ns/ps);
System.out.println();
}
public static void main(String[] args) {
System.setProperty("java.util.logging.SimpleFormatter.format",
"[%1$tT] [%4$-7s] %5$s %n");
int noOfProcessors = Runtime.getRuntime().availableProcessors();
logger.info(() -> "Available processors: " + noOfProcessors);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int initialPoolSize = forkJoinPool.getPoolSize();
int commonPoolParallelism = ForkJoinPool.getCommonPoolParallelism();
logger.info(() -> "Common Pool parallelism :" + commonPoolParallelism);
logger.info(() -> "Common Pool size before:" + initialPoolSize);
FibonacciRecursiveAction fibonacciRecursiveAction = new FibonacciRecursiveAction(12);
forkJoinPool.invoke(fibonacciRecursiveAction);
logger.info(() -> "Fibonacci: " + fibonacciRecursiveAction.fibonacciNumber());
int afterPoolSize = forkJoinPool.getPoolSize();
logger.info(() -> "Common Pool size after :" + afterPoolSize);
}
static void test(ForkJoinPool g, int i) throws Exception {
boardSize = i;
int ps = g.getParallelism();
long start = System.nanoTime();
NQueensCS task = new NQueensCS(new int[0]);
g.invoke(task);
int solutions = task.solutions;
long time = System.nanoTime() - start;
double secs = (double) time / NPS;
if (solutions != expectedSolutions[i])
throw new Error();
System.out.printf("NQueensCS %3d", i);
System.out.printf(" Time: %7.3f", secs);
long sc = g.getStealCount();
long ns = sc - lastStealCount;
lastStealCount = sc;
System.out.printf(" Steals/t: %5d", ns/ps);
System.out.println();
}
static void test(ForkJoinPool g, int i) throws Exception {
boardSize = i;
int ps = g.getParallelism();
long start = System.nanoTime();
NQueensCS task = new NQueensCS(new int[0]);
g.invoke(task);
int solutions = task.solutions;
long time = System.nanoTime() - start;
double secs = (double) time / NPS;
if (solutions != expectedSolutions[i])
throw new Error();
System.out.printf("NQueensCS %3d", i);
System.out.printf(" Time: %7.3f", secs);
long sc = g.getStealCount();
long ns = sc - lastStealCount;
lastStealCount = sc;
System.out.printf(" Steals/t: %5d", ns/ps);
System.out.println();
}
@Override
public AbstractFeature build() throws JATEException {
Containment feature = new Containment();
//start workers
int cores = properties.getMaxCPUCores();
cores = cores == 0 ? 1 : cores;
int maxPerThread = getMaxPerThread(cores);
StringBuilder sb = new StringBuilder("Building features using cpu cores=");
sb.append(cores).append(", total terms=").append(uniqueCandidateTerms.size()).append(", max per worker=")
.append(maxPerThread);
LOG.info(sb.toString());
ContainmentFBWorker worker = new
ContainmentFBWorker(new ArrayList<>(uniqueCandidateTerms), maxPerThread,
feature,
termComponentIndex);
ForkJoinPool forkJoinPool = new ForkJoinPool(cores);
int[] total = forkJoinPool.invoke(worker);
sb = new StringBuilder("Complete building features. Total=");
sb.append(total[1]).append(" success=").append(total[0]);
LOG.info(sb.toString());
return feature;
}
public static void main(String[] args) throws InterruptedException {
AppleTree[] appleTrees = AppleTree.newTreeGarden(12);
PickFruitAction task = new PickFruitAction(appleTrees, 0, appleTrees.length - 1);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(task);
// try this: pool.execute(task);
// try this: pool.execute(task); task.join();
// try this: pool.execute(task); pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println();
System.out.println("Done!");
}
public static void asyncPartitionedUpdate(BufferedReader bufferedReader, GraphDatabaseService graphDb, ProcessorMessage processorMessage) throws IOException {
Integer reportBlockSize = 10000;
Stream<String> iterator = bufferedReader.lines();
List<Spliterator<String>> spliteratorList = new ArrayList<>();
boolean hasSpliterator = true;
Spliterator<String> nodeSpliterator = iterator.spliterator();
while (hasSpliterator) {
Spliterator<String> localSpliterator = nodeSpliterator.trySplit();
hasSpliterator = localSpliterator != null;
if (hasSpliterator)
spliteratorList.add(localSpliterator);
}
counter = 0;
if (spliteratorList.size() > 4) {
// Fork join
ParallelBatchTransaction parallelBatchTransaction =
new ParallelBatchTransaction(spliteratorList.toArray(new Spliterator[spliteratorList.size()]),
0, spliteratorList.size(), graphDb, reportBlockSize, spliteratorList.size(), processorMessage);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(parallelBatchTransaction);
} else {
// Sequential
Transaction tx = graphDb.beginTx();
Node partitionNode = graphDb.getNodeById(processorMessage.getPartitionDescription().getPartitionId());
spliteratorList.forEach(sl -> sl.forEachRemaining(n -> updatePartitionBlockForRow(n, graphDb, reportBlockSize, processorMessage, partitionNode)));
tx.success();
tx.close();
}
System.out.println("Job completed");
}
private <T> T testInvokeOnPool(ForkJoinPool pool, RecursiveTask<T> a) {
try (PoolCleaner cleaner = cleaner(pool)) {
checkNotDone(a);
T result = pool.invoke(a);
checkCompletedNormally(a, result);
return result;
}
}
/**
* After invoking a single task, isQuiescent eventually becomes
* true, at which time queues are empty, threads are not active,
* the task has completed successfully, and construction
* parameters continue to hold
*/
public void testIsQuiescent() throws Exception {
ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
assertTrue(p.isQuiescent());
long startTime = System.nanoTime();
FibTask f = new FibTask(20);
p.invoke(f);
assertSame(ForkJoinPool.defaultForkJoinWorkerThreadFactory,
p.getFactory());
while (! p.isQuiescent()) {
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
throw new AssertionFailedError("timed out");
assertFalse(p.getAsyncMode());
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
Thread.yield();
}
assertTrue(p.isQuiescent());
assertFalse(p.getAsyncMode());
assertEquals(0, p.getQueuedTaskCount());
assertEquals(0, p.getQueuedSubmissionCount());
assertFalse(p.hasQueuedSubmissions());
while (p.getActiveThreadCount() != 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertFalse(p.isShutdown());
assertFalse(p.isTerminating());
assertFalse(p.isTerminated());
assertTrue(f.isDone());
assertEquals(6765, (int) f.get());
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
}
}
public static BufferedImage blur(BufferedImage srcImage) {
int w = srcImage.getWidth();
int h = srcImage.getHeight();
int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
int[] dst = new int[src.length];
System.out.println("Array size is " + src.length);
System.out.println("Threshold is " + sThreshold);
int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor"
+ (processors != 1 ? "s are " : " is ")
+ "available");
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
ForkJoinPool pool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
pool.invoke(fb);
long endTime = System.currentTimeMillis();
System.out.println("Image blur took " + (endTime - startTime) +
" milliseconds.");
BufferedImage dstImage =
new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
dstImage.setRGB(0, 0, w, h, dst, 0, w);
return dstImage;
}
public void clean(
BuckConfig buckConfig,
Path ideaConfigDir,
Path librariesXmlBase,
boolean runPostGenerationCleaner,
boolean removeOldLibraries) {
if (!runPostGenerationCleaner && !removeOldLibraries) {
return;
}
Set<File> buckDirectories = new HashSet<>();
buckDirectories.add(
convertPathToFile(
projectFilesystem.resolve(projectFilesystem.getBuckPaths().getBuckOut())));
ArtifactCacheBuckConfig cacheBuckConfig = new ArtifactCacheBuckConfig(buckConfig);
for (DirCacheEntry entry : cacheBuckConfig.getCacheEntries().getDirCacheEntries()) {
buckDirectories.add(convertPathToFile(entry.getCacheDir()));
}
ForkJoinPool cleanExecutor = new ForkJoinPool(getParallelismLimit());
try {
cleanExecutor.invoke(
new RecursiveAction() {
@Override
protected void compute() {
List<RecursiveAction> topLevelTasks = new ArrayList<>(2);
if (runPostGenerationCleaner) {
topLevelTasks.add(
new CandidateFinderWithExclusions(
convertPathToFile(projectFilesystem.resolve("")),
IML_FILENAME_FILTER,
buckDirectories));
topLevelTasks.add(
new CandidateFinderWithExclusions(
ideaConfigDir.toFile(), IML_FILENAME_FILTER, buckDirectories));
}
topLevelTasks.add(
new CandidateFinder(convertPathToFile(librariesXmlBase), XML_FILENAME_FILTER));
invokeAll(topLevelTasks);
}
});
} finally {
cleanExecutor.shutdown();
try {
cleanExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIME_UNIT);
} catch (InterruptedException e) {
Logger.get(IJProjectCleaner.class).warn("Timeout during executor shutdown.", e);
}
}
}
static double computeArea(ForkJoinPool pool, double l, double r) {
SQuad q = new SQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
private void computeParallel() {
ForkJoinPool pool = new ForkJoinPool(Parallelism.instance().getLevel());
pool.invoke(new ParallelComputation(0, variables.size()));
}
static double computeArea(ForkJoinPool pool, double l, double r) {
FQuad q = new FQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
static double computeArea(ForkJoinPool pool, double l, double r) {
DQuad q = new DQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
static double computeArea(ForkJoinPool pool, double l, double r) {
DQuad q = new DQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
static double computeArea(ForkJoinPool pool, double l, double r) {
FQuad q = new FQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
static double computeArea(ForkJoinPool pool, double l, double r) {
SQuad q = new SQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
static double computeArea(ForkJoinPool pool, double l, double r) {
FQuad q = new FQuad(l, r, 0);
pool.invoke(q);
return q.area;
}
/**
* Lists file statuses recursively based on given file system objects {@link Scope}.
* Uses {@link ForkJoinPool} executor service and {@link RecursiveListing} task
* to parallel and speed up listing.
*
* @param fs file system
* @param path path to file or directory
* @param scope file system objects scope
* @param suppressExceptions indicates if exceptions should be ignored
* @param filter filter to be applied
* @return list of file statuses
*/
private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
ForkJoinPool pool = new ForkJoinPool();
try {
RecursiveListing task = new RecursiveListing(fs, path, scope, suppressExceptions, filter);
return pool.invoke(task);
} finally {
pool.shutdown();
}
}