下面列出了java.util.List#parallelStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Add a ShufflePipe to the end of the Pipeline. All the objects previous to
* this step are aggregated in a greedy fashion, their order randomized and
* emitted as a List.
*
* Greedy
*
* No Effect on Path
*
* Stream -> Shuffled Stream
*
* Utilize Collections.shuffle()
*
* @return the extended Pipeline
*/
public ExternalTraversalEngine shuffle() {
// Check Invalid Input element class
checkInvalidInputElementClass(List.class);
// Pipeline Update
List list = (List) stream.collect(Collectors.toList());
Collections.shuffle(list);
if (isParallel)
stream = list.parallelStream();
else
stream = list.stream();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "shuffle", args);
stepList.add(step);
return this;
}
/**
* Add shuffle step to the traversal engine
*
* Greedy
*
* No Effect on Path
*
* Stream -> Shuffled Stream
*
* Utilize Collections.shuffle()
*
* @return the extended Pipeline
*/
public CachedTraversalEngine shuffle() {
// Check Invalid Input element class
checkInvalidInputElementClass(List.class);
// Stream Update
List list = (List) stream.collect(Collectors.toList());
Collections.shuffle(list);
if (isParallel)
stream = list.parallelStream();
else
stream = list.stream();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "shuffle", args);
stepList.add(step);
return this;
}
/**
* Add a ShufflePipe to the end of the Pipeline. All the objects previous to
* this step are aggregated in a greedy fashion, their order randomized and
* emitted as a List.
*
* Greedy
*
* No Effect on Path
*
* Stream -> Shuffled Stream
*
* Utilize Collections.shuffle()
*
* @return the extended Pipeline
*/
public NaiveTraversalEngine shuffle() {
// Check Invalid Input element class
checkInvalidInputElementClass(List.class);
// Pipeline Update
List list = (List) stream.collect(Collectors.toList());
Collections.shuffle(list);
if (isParallel)
stream = list.parallelStream();
else
stream = list.stream();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "shuffle", args);
stepList.add(step);
return this;
}
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream() {
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
/**
* Add a RandomFilterPipe to the end of the Pipeline. A biased coin toss
* determines if the object is emitted or not.
*
* random number: 0~1 ( higher -> less filtered )
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param bias:
* pass if bias > random the bias of the random coin
* @return the extended Pipeline
*/
public TraversalEngine random(final Double bias) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> bias > new Random().nextDouble())
.collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> bias > new Random().nextDouble());
}
// Step Update
final Class[] args = new Class[1];
args[0] = Double.class;
final Step step = new Step(this.getClass().getName(), "random", args, bias);
stepList.add(step);
return this;
}
/**
* Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
* if it is not in the provided collection.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param collection
* the collection except from the stream
* @return the extended Pipeline
*/
public TraversalEngine except(final Collection collection) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> !collection.contains(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = Collection.class;
final Step step = new Step(this.getClass().getName(), "except", args, collection);
stepList.add(step);
return this;
}
/**
* Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
* if it is not in the provided collection.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param collection
* the collection except from the stream
* @return the extended Pipeline
*/
public ExternalTraversalEngine except(final Collection collection) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> !collection.contains(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = Collection.class;
final Step step = new Step(this.getClass().getName(), "except", args, collection);
stepList.add(step);
return this;
}
/**
* Add an FilterFunctionPipe to the end of the Pipeline. The serves are an
* arbitrary filter where the filter criteria is provided by the filterFunction.
*
* Identical semantic with Java Parallelism filter
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param filterFunction
* the filter function of the pipe
* @return the extended Pipeline
*/
public TraversalEngine filter(final PipeFunction filterFunction) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> (boolean) filterFunction.compute(e))
.collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
// currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> (boolean) filterFunction.compute(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = PipeFunction.class;
final Step step = new Step(this.getClass().getName(), "filter", args, filterFunction);
stepList.add(step);
return this;
}
/**
* Add a RageFilterPipe to the end of the Pipeline. Analogous to a high/low
* index lookup.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param maxSize
* the high end of the range
* @return the extended Pipeline
*/
public NaiveTraversalEngine range(final int maxSize) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.limit(maxSize).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.limit(maxSize);
}
// Step Update
final Class[] args = new Class[1];
args[0] = Integer.TYPE;
final Step step = new Step(this.getClass().getName(), "range", args, maxSize);
stepList.add(step);
return this;
}
/**
* Add a DuplicateFilterPipe to the end of the Pipeline. Will only emit the
* distinct objects
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Deduplicated Stream
*
* Path: Map<DeduplicationHolder, Deduplicated Set<Object>>
*
* @return the extended Pipeline
*/
public ExternalTraversalEngine dedup() {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.distinct().collect(Collectors.toList());
// Update Path ( Only one path retain per last key )
currentPath = currentPath.entrySet().parallelStream().map(e -> {
Entry entry = (Entry) e;
Set set = new HashSet((Set) ((Set) e.getValue()).parallelStream().limit(1).collect(Collectors.toSet()));
return new AbstractMap.SimpleImmutableEntry(entry.getKey(), set);
}).collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.distinct();
}
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "dedup", args);
stepList.add(step);
return this;
}
/**
* Add a RetainFilterPipe to the end of the Pipeline. Will emit the object only
* if it is in the provided collection.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param collection
* the collection to retain
* @return the extended Pipeline
*/
public TraversalEngine retain(final Collection collection) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> collection.contains(e)).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> collection.contains(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = Collection.class;
final Step step = new Step(this.getClass().getName(), "retain", args, collection);
stepList.add(step);
return this;
}
/**
* Add a DuplicateFilterPipe to the end of the Pipeline. Will only emit the
* distinct objects
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Deduplicated Stream
*
* Path: Map<DeduplicationHolder, Deduplicated Set<Object>>
*
* @return the extended Pipeline
*/
public NaiveTraversalEngine dedup() {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.distinct().collect(Collectors.toList());
// Update Path ( Only one path retain per last key )
currentPath = currentPath.entrySet().parallelStream().map(e -> {
Entry entry = (Entry) e;
Set set = new HashSet((Set) ((Set) e.getValue()).parallelStream().limit(1).collect(Collectors.toSet()));
return new AbstractMap.SimpleImmutableEntry(entry.getKey(), set);
}).collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.distinct();
}
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "dedup", args);
stepList.add(step);
return this;
}
/**
* Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
* if it is not in the provided collection.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param collection
* the collection except from the stream
* @return the extended Pipeline
*/
public NaiveTraversalEngine except(final Collection collection) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> !collection.contains(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = Collection.class;
final Step step = new Step(this.getClass().getName(), "except", args, collection);
stepList.add(step);
return this;
}
/**
* Add a RetainFilterPipe to the end of the Pipeline. Will emit the object only
* if it is in the provided collection.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param collection
* the collection to retain
* @return the extended Pipeline
*/
public NaiveTraversalEngine retain(final Collection collection) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> collection.contains(e)).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> collection.contains(e));
}
// Step Update
final Class[] args = new Class[1];
args[0] = Collection.class;
final Step step = new Step(this.getClass().getName(), "retain", args, collection);
stepList.add(step);
return this;
}
/**
* Add a RandomFilterPipe to the end of the Pipeline. A biased coin toss
* determines if the object is emitted or not.
*
* random number: 0~1 ( higher -> less filtered )
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream -> Filtered Stream
*
* Path: Map<DeduplicationHolder, Filtered Set<Object>>
*
* @param bias:
* pass if bias > random the bias of the random coin
* @return the extended Pipeline
*/
public NaiveTraversalEngine random(final Double bias) {
// Pipeline Update
if (isPathEnabled) {
List intermediate = (List) stream.filter(e -> bias > new Random().nextDouble())
.collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.filter(e -> bias > new Random().nextDouble());
}
// Step Update
final Class[] args = new Class[1];
args[0] = Double.class;
final Step step = new Step(this.getClass().getName(), "random", args, bias);
stepList.add(step);
return this;
}
/**
* Add a CyclicPathFilterPipe to the end of the Pipeline. If the object's path
* is repeating (looping), then the object is filtered. Thus, what is emitted
* are those objects whose history is composed of unique objects.
*
* Path Enabled: Greedy
*
* Path Disabled: Throw UnsupportedOperationException
*
* @return the extended Pipeline
*/
public ExternalTraversalEngine simplePath() {
// Pipeline Update
if (isPathEnabled) {
// Update Path ( Only one path retain per last key )
Set prevLastPathElementSet = currentPath.keySet();
currentPath = currentPath.entrySet().parallelStream().map(e -> {
Entry entry = (Entry) e;
Set pathSet = (Set) e.getValue();
Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
List path = (List) p;
Set redTestSet = new HashSet(path);
if (path.size() != redTestSet.size())
return false;
else
return true;
}).collect(Collectors.toSet());
if (simplePathSet.size() == 0)
return null;
else
return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
}).filter(p -> p != null)
.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));
// Non simple paths
prevLastPathElementSet.removeAll(currentPath.keySet());
// Filter non simple paths
List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
.collect(Collectors.toList());
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else
throw new UnsupportedOperationException();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "simplePath", args);
stepList.add(step);
return this;
}
/**
* Add a CyclicPathFilterPipe to the end of the Pipeline. If the object's path
* is repeating (looping), then the object is filtered. Thus, what is emitted
* are those objects whose history is composed of unique objects.
*
* Path Enabled: Greedy
*
* Path Disabled: Throw UnsupportedOperationException
*
* @return the extended Pipeline
*/
public TraversalEngine simplePath() {
// Pipeline Update
if (isPathEnabled) {
// Update Path ( Only one path retain per last key )
Set prevLastPathElementSet = currentPath.keySet();
currentPath = currentPath.entrySet().parallelStream().map(e -> {
Entry entry = (Entry) e;
Set pathSet = (Set) e.getValue();
Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
List path = (List) p;
Set redTestSet = new HashSet(path);
if (path.size() != redTestSet.size())
return false;
else
return true;
}).collect(Collectors.toSet());
if (simplePathSet.size() == 0)
return null;
else
return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
}).filter(p -> p != null)
.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));
// Non simple paths
prevLastPathElementSet.removeAll(currentPath.keySet());
// Filter non simple paths
List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
.collect(Collectors.toList());
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else
throw new UnsupportedOperationException();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "simplePath", args);
stepList.add(step);
return this;
}
/**
* Add a ScatterPipe to the end of the Pipeline. Any input iterator or iterable
* is unrolled and the iterator/iterable's objects are emitted one at a time.
*
* Path Enabled: Greedy
*
* Path Disabled: Lazy
*
* Pipeline: Stream<Collection> -> Stream<Object>, Stream<Stream> ->
* Stream<Object>, Stream<Object> -> Stream<Object>
*
* @return the extended Pipeline
*/
public ExternalTraversalEngine scatter() {
if (elementClass != List.class)
return this;
// Check Input element class
// checkInputElementClass(List.class);
// Pipeline Update
if (isPathEnabled) {
// Get Sub-Path
List intermediate = (List) stream.flatMap(e -> {
return ((List) e).parallelStream();
}).collect(Collectors.toList());
// Update Path ( Filter if any last elements of each path are not
// included in intermediate )
// currentPath.keySet().retainAll(intermediate);
// Make stream again
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else {
stream = stream.flatMap(e -> {
if (isParallel)
return ((List) e).parallelStream();
else
return ((List) e).stream();
});
}
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "scatter", args);
stepList.add(step);
// Set Class
elementClass = listElementClass;
listElementClass = null;
return this;
}
/**
* Add simplePath step to the traversal engine
*
* Path Enabled: Greedy
*
* Path Disabled: Throw UnsupportedOperationException
*
* @return the extended Stream
*/
public CachedTraversalEngine simplePath() {
// Stream Update
if (isPathEnabled) {
// Update Path ( Only one path retain per last key )
Set prevLastPathElementSet = currentPath.keySet();
currentPath = currentPath.entrySet().parallelStream().map(e -> {
Entry entry = (Entry) e;
Set pathSet = (Set) e.getValue();
Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
List path = (List) p;
Set redTestSet = new HashSet(path);
if (path.size() != redTestSet.size())
return false;
else
return true;
}).collect(Collectors.toSet());
if (simplePathSet.size() == 0)
return null;
else
return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
}).filter(p -> p != null)
.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));
// Non simple paths
prevLastPathElementSet.removeAll(currentPath.keySet());
// Filter non simple paths
List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
.collect(Collectors.toList());
if (isParallel)
stream = intermediate.parallelStream();
else
stream = intermediate.stream();
} else
throw new UnsupportedOperationException();
// Step Update
final Class[] args = {};
final Step step = new Step(this.getClass().getName(), "simplePath", args);
stepList.add(step);
return this;
}
public static <T> Stream<T> fromListParallel( List<T> list ) {
return list.parallelStream();
}