下面列出了java.util.Collection#parallelStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
/**
* Optimized filter method for filtering down a set of INodes to a smaller subset but returns a
* Stream for further processing.
*
* @param inodes the main inode set to work on
* @param filters set of filters to use
* @param filterOps matching length set of filter operands and operators
* @return the stream of filtered inodes
*/
@Override // QueryEngine
public Stream<INode> combinedFilterToStream(
Collection<INode> inodes, String[] filters, String[] filterOps) {
if (filters == null || filterOps == null || filters.length == 0 || filterOps.length == 0) {
return inodes.parallelStream();
}
long start = System.currentTimeMillis();
try {
return produceFilteredStream(inodes, filters, filterOps);
} finally {
long end = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Settings filters: {} with filterOps: {} took: {} ms.",
Arrays.asList(filters),
Arrays.asList(filterOps),
(end - start));
}
}
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
public ConcatTest(String scenario, Collection<Integer> c1, Collection<Integer> c2, Collection<Integer> expected) {
this.scenario = scenario;
this.c1 = c1;
this.c2 = c2;
this.expected = expected;
// verify prerequisite
Stream<Integer> s1s = c1.stream();
Stream<Integer> s2s = c2.stream();
Stream<Integer> s1p = c1.parallelStream();
Stream<Integer> s2p = c2.parallelStream();
assertTrue(s1p.isParallel());
assertTrue(s2p.isParallel());
assertFalse(s1s.isParallel());
assertFalse(s2s.isParallel());
assertTrue(s1s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s1p.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2s.spliterator().hasCharacteristics(Spliterator.ORDERED));
assertTrue(s2p.spliterator().hasCharacteristics(Spliterator.ORDERED));
}
/**
* Utility method for setting up filtering against NameNode.
*
* @param nameNodeLoader the NameNodeLoader
* @param set whether set of files or dirs
* @param filters the filter types
* @param filterOps the filter operations
* @return the inodes collection that passed the filter
*/
public static Stream<INode> setFilters(
NameNodeLoader nameNodeLoader, String set, String[] filters, String[] filterOps) {
Collection<INode> inodes = nameNodeLoader.getINodeSet(set);
if (filters == null || filters.length == 0 || filterOps == null || filterOps.length == 0) {
return inodes.parallelStream();
}
return nameNodeLoader.getQueryEngine().combinedFilterToStream(inodes, filters, filterOps);
}
/**
* Perform the find operation on a /filter endpoint call.
*
* @param inodes set of inodes to work on
* @param find the find operation to perform
* @return the result of the find operation
*/
@Override // QueryEngine
public Collection<INode> findFilter(Collection<INode> inodes, String find) {
if (find == null || find.isEmpty()) {
return inodes;
}
String[] findOps = find.split(":");
Function<INode, Long> findToLong = getFilterFunctionToLongForINode(findOps[1]);
long start = System.currentTimeMillis();
Optional<INode> optional;
try {
Stream<INode> stream = inodes.parallelStream();
switch (findOps[0]) {
case "max":
optional = stream.max(Comparator.comparingLong(findToLong::apply));
break;
case "min":
optional = stream.min(Comparator.comparingLong(findToLong::apply));
break;
default:
throw new IllegalArgumentException("Unknown find query type: " + findOps[0]);
}
} finally {
long end = System.currentTimeMillis();
LOG.info("Performing find: {} took: {} ms.", Arrays.asList(findOps), (end - start));
}
return optional.<Collection<INode>>map(Collections::singleton).orElseGet(Collections::emptySet);
}
private static <T extends QualifiedContent> Stream<T> streamOf(
Collection<TransformInput> inputs,
Function<TransformInput, Collection<T>> mapping) {
Collection<T> list = inputs.stream()
.map(mapping)
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (list.size() >= Runtime.getRuntime().availableProcessors())
return list.parallelStream();
else
return list.stream();
}
private void buildASTs() {
Boolean bpe = configuration.getOptimizationOptions().get(CompilerConfiguration.PARALLEL_PARSE);
boolean parallelParseEnabled = null != bpe && bpe;
Collection<SourceUnit> sourceUnits = sources.values();
Stream<SourceUnit> sourceUnitStream =
(!parallelParseEnabled || sourceUnits.size() < 2)
? sourceUnits.stream() // no need to build AST with parallel stream when we just have one/no source unit
: sourceUnits.parallelStream();
// DON'T replace `collect(Collectors.counting())` with `count()` here, otherwise peek will NOT be triggered
sourceUnitStream.peek(SourceUnit::buildAST).collect(Collectors.counting());
}
protected final Stream<S> allServicesForStoreStream() {
Collection<S> services = allServicesForStore();
// call multiple services in parallel if requested and there are more than one
return parallel && services.size() > 1 ? services.parallelStream() : services.stream();
}
protected final Stream<S> allServicesStream() {
Collection<S> services = allServices();
// call multiple services in parallel if requested and there are more than one
return parallel && services.size() > 1 ? services.parallelStream() : services.stream();
}