java.util.concurrent.ForkJoinPool#submit ( )源码实例Demo

下面列出了java.util.concurrent.ForkJoinPool#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: vespa   文件: ClusterMetricsRetriever.java
/**
 * Call the metrics API on each host and aggregate the metrics
 * into a single value, grouped by cluster.
 */
public Map<ClusterInfo, MetricsAggregator> requestMetricsGroupedByCluster(Collection<URI> hosts) {
    Map<ClusterInfo, MetricsAggregator> clusterMetricsMap = new ConcurrentHashMap<>();

    long startTime = System.currentTimeMillis();
    Runnable retrieveMetricsJob = () ->
            hosts.parallelStream().forEach(host ->
                getHostMetrics(host, clusterMetricsMap)
            );

    ForkJoinPool threadPool = new ForkJoinPool(10);
    threadPool.submit(retrieveMetricsJob);
    threadPool.shutdown();

    try {
        threadPool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

    log.log(Level.FINE, () ->
            String.format("Metric retrieval for %d nodes took %d milliseconds", hosts.size(), System.currentTimeMillis() - startTime)
    );

    return clusterMetricsMap;
}
 
源代码2 项目: jdk-source-analysis   文件: ForkJoinPoolTest.java
@Test
public void test() {
    ForkJoinPool pool = new ForkJoinPool(2);
    String homePath = System.getProperty("user.home");
    FileCountTask task = new FileCountTask(homePath);
    ForkJoinTask<Integer> result = pool.submit(task);
    try {
        Integer count = result.get();
        System.out.println("file count = " + count);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    pool.shutdown();
    while (!pool.isTerminated()) {
    }
    System.out.println("All thread finish...");
}
 
源代码3 项目: JavaTutorial   文件: HelloForkJoin.java
/**
 * 使用fork/join并行计算数字累加。
 * 
 * @param count 数字个数(从1开始)
 * @throws InterruptedException
 */
private static void parallelCompute(int count)
        throws InterruptedException {
    int[] numbers = new int[count];
    for (int i = 0; i < count; i++) {
        numbers[i] = i+1;
    }
    ForkJoinPool pool = new ForkJoinPool(4);
    HelloForkJoin task = new HelloForkJoin(numbers);
    long startTime = System.currentTimeMillis();
    pool.submit(task);
    pool.shutdown();
    pool.awaitTermination(10, TimeUnit.SECONDS);
    
    System.out.println( String.format("并行计算结果:%d,耗时:%d毫秒", task._result, System.currentTimeMillis()-startTime) );
}
 
源代码4 项目: JavaCommon   文件: ForkJoinTaskDemo.java
public static void main(String[] args) throws InterruptedException {
	long startTime = System.currentTimeMillis();
	int count = 0;
	for (int i = 1; i < 10; i++) {
		count = count + i;
		Thread.sleep(1000);
	}
	System.out.println(count);
	long endTime = System.currentTimeMillis(); // 获取结束时间
	System.out.println("程序运行时间: " + (startTime - endTime) + "ms");

	long startTime1 = System.currentTimeMillis();
	CountTask countTask = new CountTask(1, 10);
	ForkJoinPool forkJoinPool = new ForkJoinPool();
	Future<Integer> futureTask = forkJoinPool.submit(countTask);
	try {
		System.out.println(futureTask.get());
	} catch (ExecutionException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	long endTime1 = System.currentTimeMillis(); // 获取结束时间
	System.out.println("程序运行时间: " + (startTime1 - endTime1) + "ms");

}
 
源代码5 项目: openjdk-jdk9   文件: ForkJoinPoolTest.java
/**
 * getPoolSize returns number of started workers.
 */
public void testGetPoolSize() {
    final CountDownLatch taskStarted = new CountDownLatch(1);
    final CountDownLatch done = new CountDownLatch(1);
    final ForkJoinPool p = new ForkJoinPool(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        assertEquals(0, p.getActiveThreadCount());
        final Runnable task = new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                taskStarted.countDown();
                assertEquals(1, p.getPoolSize());
                assertEquals(1, p.getActiveThreadCount());
                done.await();
            }};
        Future<?> future = p.submit(task);
        await(taskStarted);
        assertEquals(1, p.getPoolSize());
        assertEquals(1, p.getActiveThreadCount());
        done.countDown();
    }
    assertEquals(0, p.getPoolSize());
    assertEquals(0, p.getActiveThreadCount());
}
 
源代码6 项目: light_drtc   文件: JobService.java
@Override
public int rtcStats(List<String> userLogs) {
	System.out.println(ConfigProperty.getCurDateTime()+" 计算之初 分配的用户信息个数:"+userLogs.size());
	ForkJoinPool fjPool = new ForkJoinPool(); 
	Future<Integer> fjTask = fjPool.submit(new StatsTask(userLogs));
	int rtNum = -1;
	try {
		rtNum = fjTask.get();
	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	}
	fjPool.shutdown();
	fjPool = null;
	userLogs.clear();
	userLogs = null;
	System.out.println(ConfigProperty.getCurDateTime()+ " 计算结束 有效用户信息个数:"+rtNum);
	
	return rtNum;
}
 
源代码7 项目: training   文件: ParallelIOIntensive.java
public static void executeInCustomFJP(Runnable r) throws InterruptedException {
	ForkJoinPool pool = new ForkJoinPool(10);
	pool.submit(r);
	Stream<Object> ss = Stream.of("a");
	
	pool.submit(() -> ss.forEach(System.out::println));
	
	pool.shutdown();
	pool.awaitTermination(10, TimeUnit.SECONDS);
	
}
 
源代码8 项目: j2objc   文件: ForkJoinPoolTest.java
/**
 * A task submitted after shutdown is rejected
 */
public void testSubmitAfterShutdown() {
    ForkJoinPool p = new ForkJoinPool(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        p.shutdown();
        assertTrue(p.isShutdown());
        try {
            ForkJoinTask<Integer> f = p.submit(new FibTask(8));
            shouldThrow();
        } catch (RejectedExecutionException success) {}
    }
}
 
源代码9 项目: j2objc   文件: ForkJoinPoolTest.java
/**
 * Completed submit(ForkJoinTask) returns result
 */
public void testSubmitForkJoinTask() throws Throwable {
    ForkJoinPool p = new ForkJoinPool(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        ForkJoinTask<Integer> f = p.submit(new FibTask(8));
        assertEquals(21, (int) f.get());
    }
}
 
源代码10 项目: Project   文件: ForkJoinTaskExample.java
public static void main(String[] args) {
    ForkJoinPool forkjoinPool = new ForkJoinPool();

    //生成一个计算任务,计算1+2+3+4
    ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

    //执行一个任务
    Future<Integer> result = forkjoinPool.submit(task);

    try {
        log.info("result:{}", result.get());
    } catch (Exception e) {
        log.error("exception", e);
    }
}
 
源代码11 项目: algorithms   文件: ForkJoinTest.java
@Test
public void test() throws Exception {
    ForkJoinPool pool = new ForkJoinPool(PARALLELISM);

    final int[] result = {0};
    pool.submit(() -> {
        result[0] = calcSumOfTen();});
    pool.awaitTermination(2, TimeUnit.SECONDS);
    assertThat(result[0]).isEqualTo(55);
    pool.shutdown();
}
 
源代码12 项目: iot-mqtt   文件: MqttMutiClientSubTest.java
public static void main(String[] args) throws Exception {
	if(args.length == 0) {
		logger.error("please set config path");
		System.exit(-1);
	}
	if(args.length > 1) {
		TestSize.setClientSize(Integer.valueOf(args[1]));
	}
	
	TestConfig properties = new TestConfig(args[0]);
	
	ForkJoinPool forkJoinPool = new ForkJoinPool(TestSize.getClientSize());
	ForkJoinTask<?>[] list = new ForkJoinTask[TestSize.getClientSize()];
	for (int i = 0; i < TestSize.getClientSize(); i++) {
		int index = i;
		ForkJoinTask<?> fork = forkJoinPool.submit(new Thread(new Runnable() {
			@Override
			public void run() {
				MqttMutiClientSubTest handler = new MqttMutiClientSubTest();
				String clientId = "client"+index;
				String clientName = clientId+"Sub";
				Topic[] topics = new Topic[] { new Topic(clientId+topic0, QoS.AT_MOST_ONCE), 
						new Topic(clientId+topic1, QoS.AT_LEAST_ONCE),
						new Topic(clientId+topic2, QoS.EXACTLY_ONCE) };
				handler.init(properties, topics, clientName, false);
				logger.info(clientName+" testConn inited");
			}
		}));
		list[i] = fork;
	}
	for (int i = 0; i < TestSize.getClientSize(); i++) {
		list[i].join();
	}
	Thread.sleep(Integer.MAX_VALUE);
}
 
源代码13 项目: gp2srv   文件: ImageUtil.java
public static double calculateAverageBrightness(BufferedImage image) {
	double result = 0;

	int imageType = image.getType();
	if (imageType != BufferedImage.TYPE_INT_ARGB && imageType != BufferedImage.TYPE_INT_RGB && imageType != BufferedImage.TYPE_3BYTE_BGR
			&& imageType != BufferedImage.TYPE_4BYTE_ABGR && imageType != BufferedImage.TYPE_4BYTE_ABGR_PRE && imageType != BufferedImage.TYPE_INT_ARGB_PRE
			&& imageType != BufferedImage.TYPE_INT_BGR) {
		throw new RuntimeException("Unsupported image type: " + image.getType());
	}
	boolean hasAlpha = image.getAlphaRaster() != null;
	int pixelSize = hasAlpha ? 4 : 3;
	byte[] pixels = ((DataBufferByte) image.getRaster().getDataBuffer()).getData();

	int cpuCores = Runtime.getRuntime().availableProcessors();
	final ForkJoinPool pool = new ForkJoinPool(cpuCores);

	BrightnessCalcTask[] tasks = new BrightnessCalcTask[cpuCores];
	int subArraySize = (int) Math.ceil(((double) pixels.length) / cpuCores);
	if (subArraySize % pixelSize != 0) {
		subArraySize += pixelSize - subArraySize % pixelSize;
	}
	for (int i = 0; i < cpuCores; i++) {
		tasks[i] = new BrightnessCalcTask(pixels, subArraySize * i, Math.min(subArraySize * (i + 1), pixels.length), pixelSize);
		pool.submit(tasks[i]);
	}
	pool.shutdown();
	while (!pool.isTerminated()) {
		try {
			pool.awaitTermination(5, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
		}
	}

	for (BrightnessCalcTask task : tasks) {
		result += task.getRawResult();
	}
	result = result / tasks.length;
	return result;
}
 
源代码14 项目: iot-mqtt   文件: MqttMutiClientSendTest.java
public static void createClientTest(TestConfig properties,String clientId) throws Exception {
	MqttMutiClientSendTest handler = new MqttMutiClientSendTest();
	String clientName = clientId+"Send";
	handler.init(properties, null, clientName, false);
	logger.info(clientName+" testConn inited");
	ForkJoinPool forkJoinPool = new ForkJoinPool(TestSize.getThreadSize());
	ForkJoinTask<?>[] list = new ForkJoinTask[TestSize.getThreadSize()];
	for (int i = 0; i < TestSize.getThreadSize(); i++) {
		ForkJoinTask<?> fork = forkJoinPool.submit(new Thread(new Runnable() {
			@Override
			public void run() {
				for (int i = 0; i < TestSize.getMsgNums(); i++) {
					try {
						Thread.sleep(TestSize.getSleepTimes());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					handler.send(clientId+"QOS" + (i % 3), (clientName + "::" + i).getBytes(),
							QoS.values()[(i % 3)], false);
				}
			}
		}));
		list[i] = fork;
	}
	long start = System.currentTimeMillis();
	for (int i = 0; i < TestSize.getThreadSize(); i++) {
		list[i].join();
	}
	logger.info(clientName + " total time "+(System.currentTimeMillis() - start));
}
 
源代码15 项目: yuzhouwan   文件: ForkJoinExample.java
public static void main(String[] args) {

        ForkJoinPool.commonPool().getParallelism();
        ForkJoinPool pool = new ForkJoinPool();
        new Thread() {
            {
                this.setName("printHelloWorld");
                this.start();
            }

            @Override
            public void run() {
                pool.submit(() -> System.out.println("Hello,world first"));

                ForkJoinTask<String> task = pool.submit(() -> "Hello,world second");
                // 输出 Hello,world(永远不会输出,也不会报异常, 所以这是个bug)
                try {
                    System.out.println(task.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        /*try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        new Thread() {
            {
                this.setName("shutdownPool");
                this.start();
            }

            @Override
            public void run() {
                pool.shutdown();
            }
        };
    }
 
源代码16 项目: openjdk-jdk9   文件: ForkJoinPoolTest.java
/**
 * A task submitted after shutdown is rejected
 */
public void testSubmitAfterShutdown() {
    ForkJoinPool p = new ForkJoinPool(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        p.shutdown();
        assertTrue(p.isShutdown());
        try {
            ForkJoinTask<Integer> f = p.submit(new FibTask(8));
            shouldThrow();
        } catch (RejectedExecutionException success) {}
    }
}
 
源代码17 项目: glowroot   文件: ForkJoinPoolIT.java
@Override
public void transactionMarker() throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    Future<Integer> future = pool.submit(new SimpleTask());
    future.get();
}
 
源代码18 项目: glowroot   文件: ForkJoinPoolIT.java
@Override
public void transactionMarker() throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    Future<?> future = pool.submit(new SimpleRunnable());
    future.get();
}
 
源代码19 项目: glowroot   文件: ForkJoinPoolIT.java
@Override
public void transactionMarker() throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    Future<Integer> future = pool.submit(ForkJoinTask.adapt(new SimpleRunnable(), 5));
    future.get();
}
 
源代码20 项目: java-codes   文件: ForkJoinTest.java
public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future<Long> result = forkJoinPool.submit(new Sum(1, 10000));
    System.out.println("Sum:" + result.get());//Sum:50005000
}