下面列出了java.util.concurrent.ForkJoinTask#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
void checkNotDone(ForkJoinTask a) {
assertFalse(a.isDone());
assertFalse(a.isCompletedNormally());
assertFalse(a.isCompletedAbnormally());
assertFalse(a.isCancelled());
assertNull(a.getException());
assertNull(a.getRawResult());
if (a instanceof BinaryAsyncAction)
assertTrue(((BinaryAsyncAction)a).getForkJoinTaskTag() == INITIAL_STATE);
try {
a.get(0L, SECONDS);
shouldThrow();
} catch (TimeoutException success) {
} catch (Throwable fail) { threadUnexpectedException(fail); }
}
@Test
public void test() {
ForkJoinPool pool = new ForkJoinPool(2);
String homePath = System.getProperty("user.home");
FileCountTask task = new FileCountTask(homePath);
ForkJoinTask<Integer> result = pool.submit(task);
try {
Integer count = result.get();
System.out.println("file count = " + count);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
pool.shutdown();
while (!pool.isTerminated()) {
}
System.out.println("All thread finish...");
}
/**
* mainly use for {@link Stream#parallel()} with specific thread pool
* see https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream
*/
public static <R, X extends Throwable> R supplyParallel(ForkJoinPool pool,
ThrowableSupplier<R, X> func) throws X {
checkNotNull(pool);
Throwable[] throwable = { null };
ForkJoinTask<R> task = pool.submit(() -> {
try {
return func.get();
} catch (Throwable e) {
throwable[0] = e;
return null;
}
});
R r;
try {
r = task.get();
} catch (ExecutionException | InterruptedException impossible) {
throw new AssertionError(impossible);
}
if (throwable[0] != null) {
//noinspection unchecked
throw (X) throwable[0];
}
return r;
}
void checkNotDone(ForkJoinTask a) {
assertFalse(a.isDone());
assertFalse(a.isCompletedNormally());
assertFalse(a.isCompletedAbnormally());
assertFalse(a.isCancelled());
assertNull(a.getException());
assertNull(a.getRawResult());
if (a instanceof BinaryAsyncAction)
assertTrue(((BinaryAsyncAction)a).getForkJoinTaskTag() == INITIAL_STATE);
try {
a.get(0L, SECONDS);
shouldThrow();
} catch (TimeoutException success) {
} catch (Throwable fail) { threadUnexpectedException(fail); }
}
static int countSplits(ForkJoinPool fjp) throws Exception {
// The number of splits will be equivalent to the number of leaf nodes
// and will be a power of 2
ForkJoinTask<Integer> fInteger = fjp.submit(() -> {
Spliterator<Integer> s = IntStream.range(0, 1024).boxed().parallel().spliterator();
SplitCountingSpliterator<Integer> cs = new SplitCountingSpliterator<>(s);
StreamSupport.stream(cs, true).forEach(e -> {});
return cs.splits();
});
return fInteger.get();
}
static int countSplits(ForkJoinPool fjp) throws Exception {
// The number of splits will be equivalent to the number of leaf nodes
// and will be a power of 2
ForkJoinTask<Integer> fInteger = fjp.submit(() -> {
Spliterator<Integer> s = IntStream.range(0, 1024).boxed().parallel().spliterator();
SplitCountingSpliterator<Integer> cs = new SplitCountingSpliterator<>(s);
StreamSupport.stream(cs, true).forEach(e -> {});
return cs.splits();
});
return fInteger.get();
}
static int countSplits(ForkJoinPool fjp) throws Exception {
// The number of splits will be equivalent to the number of leaf nodes
// and will be a power of 2
ForkJoinTask<Integer> fInteger = fjp.submit(() -> {
Spliterator<Integer> s = IntStream.range(0, 1024).boxed().parallel().spliterator();
SplitCountingSpliterator<Integer> cs = new SplitCountingSpliterator<>(s);
StreamSupport.stream(cs, true).forEach(e -> {});
return cs.splits();
});
return fInteger.get();
}
void checkNotDone(ForkJoinTask a) {
assertFalse(a.isDone());
assertFalse(a.isCompletedNormally());
assertFalse(a.isCompletedAbnormally());
assertFalse(a.isCancelled());
assertNull(a.getException());
assertNull(a.getRawResult());
try {
a.get(0L, SECONDS);
shouldThrow();
} catch (TimeoutException success) {
} catch (Throwable fail) { threadUnexpectedException(fail); }
}
static int countSplits(ForkJoinPool fjp) throws Exception {
// The number of splits will be equivalent to the number of leaf nodes
// and will be a power of 2
ForkJoinTask<Integer> fInteger = fjp.submit(() -> {
Spliterator<Integer> s = IntStream.range(0, 1024).boxed().parallel().spliterator();
SplitCountingSpliterator<Integer> cs = new SplitCountingSpliterator<>(s);
StreamSupport.stream(cs, true).forEach(e -> {});
return cs.splits();
});
return fInteger.get();
}
void checkNotDone(ForkJoinTask a) {
assertFalse(a.isDone());
assertFalse(a.isCompletedNormally());
assertFalse(a.isCompletedAbnormally());
assertFalse(a.isCancelled());
assertNull(a.getException());
assertNull(a.getRawResult());
try {
a.get(0L, SECONDS);
shouldThrow();
} catch (TimeoutException success) {
} catch (Throwable fail) { threadUnexpectedException(fail); }
}
public static void main(String[] args) throws Exception
{
// Configure logging
LogManager.getLogManager().readConfiguration(PVASettings.class.getResourceAsStream("/pva_logging.properties"));
final Logger root = Logger.getLogger("");
root.setLevel(Level.WARNING);
for (Handler handler : root.getHandlers())
handler.setLevel(root.getLevel());
// Start PVA servers
final ForkJoinTask<?> server1 = ForkJoinPool.commonPool().submit(() -> serve("demo1"));
final ForkJoinTask<?> server2 = ForkJoinPool.commonPool().submit(() -> serve("demo2"));
final ForkJoinTask<?> server3 = ForkJoinPool.commonPool().submit(() -> serve("demo3"));
// PVA Client
System.out.println("Writing value 5 .. -1 to PVs...");
final PVAClient pva = new PVAClient();
final PVAChannel ch1 = pva.getChannel("demo1");
final PVAChannel ch2 = pva.getChannel("demo2");
final PVAChannel ch3 = pva.getChannel("demo3");
CompletableFuture.allOf(ch1.connect(), ch2.connect(), ch3.connect()).get();
for (double v=5.0; v>=-1.0; --v)
{
TimeUnit.MILLISECONDS.sleep(100);
ch1.write("", v);
TimeUnit.MILLISECONDS.sleep(100);
ch2.write("", v);
TimeUnit.MILLISECONDS.sleep(100);
ch3.write("", v);
}
System.out.println("Closing PVs");
ch3.close();
ch2.close();
ch1.close();
System.out.println("Waiting for servers to exit");
server3.get(2, TimeUnit.SECONDS);
server2.get(2, TimeUnit.SECONDS);
server1.get(2, TimeUnit.SECONDS);
System.out.println("Done.");
}
void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory.
if (lazypersistDir.exists()) {
int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
FsDatasetImpl.LOG.info(
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
}
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
if (!success) {
List<IOException> exceptions = Collections
.synchronizedList(new ArrayList<IOException>());
Queue<RecursiveAction> subTaskQueue =
new ConcurrentLinkedQueue<RecursiveAction>();
// add finalized replicas
AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
// add rbw replicas
task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
false, exceptions, subTaskQueue);
ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
try {
finalizedTask.get();
rbwTask.get();
} catch (InterruptedException | ExecutionException e) {
exceptions.add(new IOException(
"Failed to start sub tasks to add replica in replica map :"
+ e.getMessage()));
}
//wait for all the tasks to finish.
waitForSubTaskToFinish(subTaskQueue, exceptions);
}
}