下面列出了java.util.concurrent.ForkJoinPool#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Call the metrics API on each host and aggregate the metrics
* into a single value, grouped by cluster.
*/
public Map<ClusterInfo, MetricsAggregator> requestMetricsGroupedByCluster(Collection<URI> hosts) {
Map<ClusterInfo, MetricsAggregator> clusterMetricsMap = new ConcurrentHashMap<>();
long startTime = System.currentTimeMillis();
Runnable retrieveMetricsJob = () ->
hosts.parallelStream().forEach(host ->
getHostMetrics(host, clusterMetricsMap)
);
ForkJoinPool threadPool = new ForkJoinPool(10);
threadPool.submit(retrieveMetricsJob);
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.log(Level.FINE, () ->
String.format("Metric retrieval for %d nodes took %d milliseconds", hosts.size(), System.currentTimeMillis() - startTime)
);
return clusterMetricsMap;
}
@Test
public void test() {
ForkJoinPool pool = new ForkJoinPool(2);
String homePath = System.getProperty("user.home");
FileCountTask task = new FileCountTask(homePath);
ForkJoinTask<Integer> result = pool.submit(task);
try {
Integer count = result.get();
System.out.println("file count = " + count);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
pool.shutdown();
while (!pool.isTerminated()) {
}
System.out.println("All thread finish...");
}
/**
* 使用fork/join并行计算数字累加。
*
* @param count 数字个数(从1开始)
* @throws InterruptedException
*/
private static void parallelCompute(int count)
throws InterruptedException {
int[] numbers = new int[count];
for (int i = 0; i < count; i++) {
numbers[i] = i+1;
}
ForkJoinPool pool = new ForkJoinPool(4);
HelloForkJoin task = new HelloForkJoin(numbers);
long startTime = System.currentTimeMillis();
pool.submit(task);
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
System.out.println( String.format("并行计算结果:%d,耗时:%d毫秒", task._result, System.currentTimeMillis()-startTime) );
}
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
int count = 0;
for (int i = 1; i < 10; i++) {
count = count + i;
Thread.sleep(1000);
}
System.out.println(count);
long endTime = System.currentTimeMillis(); // 获取结束时间
System.out.println("程序运行时间: " + (startTime - endTime) + "ms");
long startTime1 = System.currentTimeMillis();
CountTask countTask = new CountTask(1, 10);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> futureTask = forkJoinPool.submit(countTask);
try {
System.out.println(futureTask.get());
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long endTime1 = System.currentTimeMillis(); // 获取结束时间
System.out.println("程序运行时间: " + (startTime1 - endTime1) + "ms");
}
/**
* getPoolSize returns number of started workers.
*/
public void testGetPoolSize() {
final CountDownLatch taskStarted = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
final ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getActiveThreadCount());
final Runnable task = new CheckedRunnable() {
public void realRun() throws InterruptedException {
taskStarted.countDown();
assertEquals(1, p.getPoolSize());
assertEquals(1, p.getActiveThreadCount());
done.await();
}};
Future<?> future = p.submit(task);
await(taskStarted);
assertEquals(1, p.getPoolSize());
assertEquals(1, p.getActiveThreadCount());
done.countDown();
}
assertEquals(0, p.getPoolSize());
assertEquals(0, p.getActiveThreadCount());
}
@Override
public int rtcStats(List<String> userLogs) {
System.out.println(ConfigProperty.getCurDateTime()+" 计算之初 分配的用户信息个数:"+userLogs.size());
ForkJoinPool fjPool = new ForkJoinPool();
Future<Integer> fjTask = fjPool.submit(new StatsTask(userLogs));
int rtNum = -1;
try {
rtNum = fjTask.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
fjPool.shutdown();
fjPool = null;
userLogs.clear();
userLogs = null;
System.out.println(ConfigProperty.getCurDateTime()+ " 计算结束 有效用户信息个数:"+rtNum);
return rtNum;
}
public static void executeInCustomFJP(Runnable r) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool(10);
pool.submit(r);
Stream<Object> ss = Stream.of("a");
pool.submit(() -> ss.forEach(System.out::println));
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
/**
* A task submitted after shutdown is rejected
*/
public void testSubmitAfterShutdown() {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
p.shutdown();
assertTrue(p.isShutdown());
try {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
shouldThrow();
} catch (RejectedExecutionException success) {}
}
}
/**
* Completed submit(ForkJoinTask) returns result
*/
public void testSubmitForkJoinTask() throws Throwable {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
assertEquals(21, (int) f.get());
}
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
@Test
public void test() throws Exception {
ForkJoinPool pool = new ForkJoinPool(PARALLELISM);
final int[] result = {0};
pool.submit(() -> {
result[0] = calcSumOfTen();});
pool.awaitTermination(2, TimeUnit.SECONDS);
assertThat(result[0]).isEqualTo(55);
pool.shutdown();
}
public static void main(String[] args) throws Exception {
if(args.length == 0) {
logger.error("please set config path");
System.exit(-1);
}
if(args.length > 1) {
TestSize.setClientSize(Integer.valueOf(args[1]));
}
TestConfig properties = new TestConfig(args[0]);
ForkJoinPool forkJoinPool = new ForkJoinPool(TestSize.getClientSize());
ForkJoinTask<?>[] list = new ForkJoinTask[TestSize.getClientSize()];
for (int i = 0; i < TestSize.getClientSize(); i++) {
int index = i;
ForkJoinTask<?> fork = forkJoinPool.submit(new Thread(new Runnable() {
@Override
public void run() {
MqttMutiClientSubTest handler = new MqttMutiClientSubTest();
String clientId = "client"+index;
String clientName = clientId+"Sub";
Topic[] topics = new Topic[] { new Topic(clientId+topic0, QoS.AT_MOST_ONCE),
new Topic(clientId+topic1, QoS.AT_LEAST_ONCE),
new Topic(clientId+topic2, QoS.EXACTLY_ONCE) };
handler.init(properties, topics, clientName, false);
logger.info(clientName+" testConn inited");
}
}));
list[i] = fork;
}
for (int i = 0; i < TestSize.getClientSize(); i++) {
list[i].join();
}
Thread.sleep(Integer.MAX_VALUE);
}
public static double calculateAverageBrightness(BufferedImage image) {
double result = 0;
int imageType = image.getType();
if (imageType != BufferedImage.TYPE_INT_ARGB && imageType != BufferedImage.TYPE_INT_RGB && imageType != BufferedImage.TYPE_3BYTE_BGR
&& imageType != BufferedImage.TYPE_4BYTE_ABGR && imageType != BufferedImage.TYPE_4BYTE_ABGR_PRE && imageType != BufferedImage.TYPE_INT_ARGB_PRE
&& imageType != BufferedImage.TYPE_INT_BGR) {
throw new RuntimeException("Unsupported image type: " + image.getType());
}
boolean hasAlpha = image.getAlphaRaster() != null;
int pixelSize = hasAlpha ? 4 : 3;
byte[] pixels = ((DataBufferByte) image.getRaster().getDataBuffer()).getData();
int cpuCores = Runtime.getRuntime().availableProcessors();
final ForkJoinPool pool = new ForkJoinPool(cpuCores);
BrightnessCalcTask[] tasks = new BrightnessCalcTask[cpuCores];
int subArraySize = (int) Math.ceil(((double) pixels.length) / cpuCores);
if (subArraySize % pixelSize != 0) {
subArraySize += pixelSize - subArraySize % pixelSize;
}
for (int i = 0; i < cpuCores; i++) {
tasks[i] = new BrightnessCalcTask(pixels, subArraySize * i, Math.min(subArraySize * (i + 1), pixels.length), pixelSize);
pool.submit(tasks[i]);
}
pool.shutdown();
while (!pool.isTerminated()) {
try {
pool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
for (BrightnessCalcTask task : tasks) {
result += task.getRawResult();
}
result = result / tasks.length;
return result;
}
public static void createClientTest(TestConfig properties,String clientId) throws Exception {
MqttMutiClientSendTest handler = new MqttMutiClientSendTest();
String clientName = clientId+"Send";
handler.init(properties, null, clientName, false);
logger.info(clientName+" testConn inited");
ForkJoinPool forkJoinPool = new ForkJoinPool(TestSize.getThreadSize());
ForkJoinTask<?>[] list = new ForkJoinTask[TestSize.getThreadSize()];
for (int i = 0; i < TestSize.getThreadSize(); i++) {
ForkJoinTask<?> fork = forkJoinPool.submit(new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestSize.getMsgNums(); i++) {
try {
Thread.sleep(TestSize.getSleepTimes());
} catch (InterruptedException e) {
e.printStackTrace();
}
handler.send(clientId+"QOS" + (i % 3), (clientName + "::" + i).getBytes(),
QoS.values()[(i % 3)], false);
}
}
}));
list[i] = fork;
}
long start = System.currentTimeMillis();
for (int i = 0; i < TestSize.getThreadSize(); i++) {
list[i].join();
}
logger.info(clientName + " total time "+(System.currentTimeMillis() - start));
}
public static void main(String[] args) {
ForkJoinPool.commonPool().getParallelism();
ForkJoinPool pool = new ForkJoinPool();
new Thread() {
{
this.setName("printHelloWorld");
this.start();
}
@Override
public void run() {
pool.submit(() -> System.out.println("Hello,world first"));
ForkJoinTask<String> task = pool.submit(() -> "Hello,world second");
// 输出 Hello,world(永远不会输出,也不会报异常, 所以这是个bug)
try {
System.out.println(task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
};
/*try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
new Thread() {
{
this.setName("shutdownPool");
this.start();
}
@Override
public void run() {
pool.shutdown();
}
};
}
/**
* A task submitted after shutdown is rejected
*/
public void testSubmitAfterShutdown() {
ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
p.shutdown();
assertTrue(p.isShutdown());
try {
ForkJoinTask<Integer> f = p.submit(new FibTask(8));
shouldThrow();
} catch (RejectedExecutionException success) {}
}
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> future = pool.submit(new SimpleTask());
future.get();
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
Future<?> future = pool.submit(new SimpleRunnable());
future.get();
}
@Override
public void transactionMarker() throws Exception {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> future = pool.submit(ForkJoinTask.adapt(new SimpleRunnable(), 5));
future.get();
}
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Long> result = forkJoinPool.submit(new Sum(1, 10000));
System.out.println("Sum:" + result.get());//Sum:50005000
}