java.util.List#parallelStream ( )源码实例Demo

下面列出了java.util.List#parallelStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: epcis   文件: ExternalTraversalEngine.java
/**
 * Add a ShufflePipe to the end of the Pipeline. All the objects previous to
 * this step are aggregated in a greedy fashion, their order randomized and
 * emitted as a List.
 *
 * Greedy
 * 
 * No Effect on Path
 *
 * Stream -> Shuffled Stream
 *
 * Utilize Collections.shuffle()
 *
 * @return the extended Pipeline
 */
public ExternalTraversalEngine shuffle() {
	// Check Invalid Input element class
	checkInvalidInputElementClass(List.class);

	// Pipeline Update

	List list = (List) stream.collect(Collectors.toList());
	Collections.shuffle(list);
	if (isParallel)
		stream = list.parallelStream();
	else
		stream = list.stream();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "shuffle", args);
	stepList.add(step);
	return this;
}
 
源代码2 项目: epcis   文件: CachedTraversalEngine.java
/**
 * Add shuffle step to the traversal engine
 *
 * Greedy
 * 
 * No Effect on Path
 *
 * Stream -> Shuffled Stream
 *
 * Utilize Collections.shuffle()
 *
 * @return the extended Pipeline
 */
public CachedTraversalEngine shuffle() {
	// Check Invalid Input element class
	checkInvalidInputElementClass(List.class);

	// Stream Update
	List list = (List) stream.collect(Collectors.toList());
	Collections.shuffle(list);
	if (isParallel)
		stream = list.parallelStream();
	else
		stream = list.stream();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "shuffle", args);
	stepList.add(step);
	return this;
}
 
源代码3 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add a ShufflePipe to the end of the Pipeline. All the objects previous to
 * this step are aggregated in a greedy fashion, their order randomized and
 * emitted as a List.
 *
 * Greedy
 * 
 * No Effect on Path
 *
 * Stream -> Shuffled Stream
 *
 * Utilize Collections.shuffle()
 *
 * @return the extended Pipeline
 */
public NaiveTraversalEngine shuffle() {
	// Check Invalid Input element class
	checkInvalidInputElementClass(List.class);

	// Pipeline Update

	List list = (List) stream.collect(Collectors.toList());
	Collections.shuffle(list);
	if (isParallel)
		stream = list.parallelStream();
	else
		stream = list.stream();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "shuffle", args);
	stepList.add(step);
	return this;
}
 
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream() {
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();

    assertTrue(parallelStream.isParallel());
}
 
源代码5 项目: epcis   文件: TraversalEngine.java
/**
 * Add a RandomFilterPipe to the end of the Pipeline. A biased coin toss
 * determines if the object is emitted or not.
 *
 * random number: 0~1 ( higher -> less filtered )
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param bias:
 *            pass if bias > random the bias of the random coin
 * @return the extended Pipeline
 */
public TraversalEngine random(final Double bias) {
	// Pipeline Update
	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> bias > new Random().nextDouble())
				.collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> bias > new Random().nextDouble());
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Double.class;
	final Step step = new Step(this.getClass().getName(), "random", args, bias);
	stepList.add(step);
	return this;
}
 
源代码6 项目: epcis   文件: TraversalEngine.java
/**
 * Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
 * if it is not in the provided collection.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param collection
 *            the collection except from the stream
 * @return the extended Pipeline
 */
public TraversalEngine except(final Collection collection) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> !collection.contains(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Collection.class;
	final Step step = new Step(this.getClass().getName(), "except", args, collection);
	stepList.add(step);
	return this;
}
 
源代码7 项目: epcis   文件: ExternalTraversalEngine.java
/**
 * Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
 * if it is not in the provided collection.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param collection
 *            the collection except from the stream
 * @return the extended Pipeline
 */
public ExternalTraversalEngine except(final Collection collection) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> !collection.contains(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Collection.class;
	final Step step = new Step(this.getClass().getName(), "except", args, collection);
	stepList.add(step);
	return this;
}
 
源代码8 项目: epcis   文件: TraversalEngine.java
/**
 * Add an FilterFunctionPipe to the end of the Pipeline. The serves are an
 * arbitrary filter where the filter criteria is provided by the filterFunction.
 *
 * Identical semantic with Java Parallelism filter
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param filterFunction
 *            the filter function of the pipe
 * @return the extended Pipeline
 */
public TraversalEngine filter(final PipeFunction filterFunction) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> (boolean) filterFunction.compute(e))
				.collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		// currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> (boolean) filterFunction.compute(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = PipeFunction.class;
	final Step step = new Step(this.getClass().getName(), "filter", args, filterFunction);
	stepList.add(step);
	return this;
}
 
源代码9 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add a RageFilterPipe to the end of the Pipeline. Analogous to a high/low
 * index lookup.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param maxSize
 *            the high end of the range
 * @return the extended Pipeline
 */
public NaiveTraversalEngine range(final int maxSize) {
	// Pipeline Update
	if (isPathEnabled) {
		List intermediate = (List) stream.limit(maxSize).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.limit(maxSize);
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Integer.TYPE;
	final Step step = new Step(this.getClass().getName(), "range", args, maxSize);
	stepList.add(step);
	return this;
}
 
源代码10 项目: epcis   文件: ExternalTraversalEngine.java
/**
 * Add a DuplicateFilterPipe to the end of the Pipeline. Will only emit the
 * distinct objects
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Deduplicated Stream
 * 
 * Path: Map<DeduplicationHolder, Deduplicated Set<Object>>
 * 
 * @return the extended Pipeline
 */
public ExternalTraversalEngine dedup() {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.distinct().collect(Collectors.toList());

		// Update Path ( Only one path retain per last key )
		currentPath = currentPath.entrySet().parallelStream().map(e -> {
			Entry entry = (Entry) e;
			Set set = new HashSet((Set) ((Set) e.getValue()).parallelStream().limit(1).collect(Collectors.toSet()));
			return new AbstractMap.SimpleImmutableEntry(entry.getKey(), set);
		}).collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.distinct();
	}

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "dedup", args);
	stepList.add(step);
	return this;
}
 
源代码11 项目: epcis   文件: TraversalEngine.java
/**
 * Add a RetainFilterPipe to the end of the Pipeline. Will emit the object only
 * if it is in the provided collection.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param collection
 *            the collection to retain
 * @return the extended Pipeline
 */

public TraversalEngine retain(final Collection collection) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> collection.contains(e)).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> collection.contains(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Collection.class;
	final Step step = new Step(this.getClass().getName(), "retain", args, collection);
	stepList.add(step);
	return this;
}
 
源代码12 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add a DuplicateFilterPipe to the end of the Pipeline. Will only emit the
 * distinct objects
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Deduplicated Stream
 * 
 * Path: Map<DeduplicationHolder, Deduplicated Set<Object>>
 * 
 * @return the extended Pipeline
 */
public NaiveTraversalEngine dedup() {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.distinct().collect(Collectors.toList());

		// Update Path ( Only one path retain per last key )
		currentPath = currentPath.entrySet().parallelStream().map(e -> {
			Entry entry = (Entry) e;
			Set set = new HashSet((Set) ((Set) e.getValue()).parallelStream().limit(1).collect(Collectors.toSet()));
			return new AbstractMap.SimpleImmutableEntry(entry.getKey(), set);
		}).collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.distinct();
	}

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "dedup", args);
	stepList.add(step);
	return this;
}
 
源代码13 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add an ExceptFilterPipe to the end of the Pipeline. Will only emit the object
 * if it is not in the provided collection.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param collection
 *            the collection except from the stream
 * @return the extended Pipeline
 */
public NaiveTraversalEngine except(final Collection collection) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> !collection.contains(e)).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> !collection.contains(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Collection.class;
	final Step step = new Step(this.getClass().getName(), "except", args, collection);
	stepList.add(step);
	return this;
}
 
源代码14 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add a RetainFilterPipe to the end of the Pipeline. Will emit the object only
 * if it is in the provided collection.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param collection
 *            the collection to retain
 * @return the extended Pipeline
 */

public NaiveTraversalEngine retain(final Collection collection) {
	// Pipeline Update

	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> collection.contains(e)).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> collection.contains(e));
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Collection.class;
	final Step step = new Step(this.getClass().getName(), "retain", args, collection);
	stepList.add(step);
	return this;
}
 
源代码15 项目: epcis   文件: NaiveTraversalEngine.java
/**
 * Add a RandomFilterPipe to the end of the Pipeline. A biased coin toss
 * determines if the object is emitted or not.
 *
 * random number: 0~1 ( higher -> less filtered )
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream -> Filtered Stream
 * 
 * Path: Map<DeduplicationHolder, Filtered Set<Object>>
 * 
 * @param bias:
 *            pass if bias > random the bias of the random coin
 * @return the extended Pipeline
 */
public NaiveTraversalEngine random(final Double bias) {
	// Pipeline Update
	if (isPathEnabled) {
		List intermediate = (List) stream.filter(e -> bias > new Random().nextDouble())
				.collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();

	} else {
		stream = stream.filter(e -> bias > new Random().nextDouble());
	}

	// Step Update
	final Class[] args = new Class[1];
	args[0] = Double.class;
	final Step step = new Step(this.getClass().getName(), "random", args, bias);
	stepList.add(step);
	return this;
}
 
源代码16 项目: epcis   文件: ExternalTraversalEngine.java
/**
 * Add a CyclicPathFilterPipe to the end of the Pipeline. If the object's path
 * is repeating (looping), then the object is filtered. Thus, what is emitted
 * are those objects whose history is composed of unique objects.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Throw UnsupportedOperationException
 *
 * @return the extended Pipeline
 */
public ExternalTraversalEngine simplePath() {
	// Pipeline Update
	if (isPathEnabled) {

		// Update Path ( Only one path retain per last key )
		Set prevLastPathElementSet = currentPath.keySet();

		currentPath = currentPath.entrySet().parallelStream().map(e -> {
			Entry entry = (Entry) e;
			Set pathSet = (Set) e.getValue();
			Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
				List path = (List) p;
				Set redTestSet = new HashSet(path);
				if (path.size() != redTestSet.size())
					return false;
				else
					return true;
			}).collect(Collectors.toSet());
			if (simplePathSet.size() == 0)
				return null;
			else
				return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
		}).filter(p -> p != null)
				.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));

		// Non simple paths
		prevLastPathElementSet.removeAll(currentPath.keySet());

		// Filter non simple paths
		List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
				.collect(Collectors.toList());
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();
	} else
		throw new UnsupportedOperationException();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "simplePath", args);
	stepList.add(step);
	return this;
}
 
源代码17 项目: epcis   文件: TraversalEngine.java
/**
 * Add a CyclicPathFilterPipe to the end of the Pipeline. If the object's path
 * is repeating (looping), then the object is filtered. Thus, what is emitted
 * are those objects whose history is composed of unique objects.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Throw UnsupportedOperationException
 *
 * @return the extended Pipeline
 */
public TraversalEngine simplePath() {
	// Pipeline Update
	if (isPathEnabled) {

		// Update Path ( Only one path retain per last key )
		Set prevLastPathElementSet = currentPath.keySet();

		currentPath = currentPath.entrySet().parallelStream().map(e -> {
			Entry entry = (Entry) e;
			Set pathSet = (Set) e.getValue();
			Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
				List path = (List) p;
				Set redTestSet = new HashSet(path);
				if (path.size() != redTestSet.size())
					return false;
				else
					return true;
			}).collect(Collectors.toSet());
			if (simplePathSet.size() == 0)
				return null;
			else
				return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
		}).filter(p -> p != null)
				.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));

		// Non simple paths
		prevLastPathElementSet.removeAll(currentPath.keySet());

		// Filter non simple paths
		List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
				.collect(Collectors.toList());
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();
	} else
		throw new UnsupportedOperationException();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "simplePath", args);
	stepList.add(step);
	return this;
}
 
源代码18 项目: epcis   文件: ExternalTraversalEngine.java
/**
 * Add a ScatterPipe to the end of the Pipeline. Any input iterator or iterable
 * is unrolled and the iterator/iterable's objects are emitted one at a time.
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Lazy
 * 
 * Pipeline: Stream<Collection> -> Stream<Object>, Stream<Stream> ->
 * Stream<Object>, Stream<Object> -> Stream<Object>
 *
 * @return the extended Pipeline
 */
public ExternalTraversalEngine scatter() {
	if (elementClass != List.class)
		return this;
	// Check Input element class
	// checkInputElementClass(List.class);

	// Pipeline Update
	if (isPathEnabled) {
		// Get Sub-Path
		List intermediate = (List) stream.flatMap(e -> {
			return ((List) e).parallelStream();
		}).collect(Collectors.toList());

		// Update Path ( Filter if any last elements of each path are not
		// included in intermediate )
		// currentPath.keySet().retainAll(intermediate);

		// Make stream again
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();
	} else {
		stream = stream.flatMap(e -> {
			if (isParallel)
				return ((List) e).parallelStream();
			else
				return ((List) e).stream();
		});
	}

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "scatter", args);
	stepList.add(step);

	// Set Class
	elementClass = listElementClass;
	listElementClass = null;
	return this;
}
 
源代码19 项目: epcis   文件: CachedTraversalEngine.java
/**
 * Add simplePath step to the traversal engine
 *
 * Path Enabled: Greedy
 * 
 * Path Disabled: Throw UnsupportedOperationException
 *
 * @return the extended Stream
 */
public CachedTraversalEngine simplePath() {
	// Stream Update
	if (isPathEnabled) {

		// Update Path ( Only one path retain per last key )
		Set prevLastPathElementSet = currentPath.keySet();

		currentPath = currentPath.entrySet().parallelStream().map(e -> {
			Entry entry = (Entry) e;
			Set pathSet = (Set) e.getValue();
			Set simplePathSet = (Set) pathSet.parallelStream().filter(p -> {
				List path = (List) p;
				Set redTestSet = new HashSet(path);
				if (path.size() != redTestSet.size())
					return false;
				else
					return true;
			}).collect(Collectors.toSet());
			if (simplePathSet.size() == 0)
				return null;
			else
				return new AbstractMap.SimpleImmutableEntry(entry.getKey(), simplePathSet);
		}).filter(p -> p != null)
				.collect(Collectors.toMap(entry -> ((Entry) entry).getKey(), entry -> ((Entry) entry).getValue()));

		// Non simple paths
		prevLastPathElementSet.removeAll(currentPath.keySet());

		// Filter non simple paths
		List intermediate = (List) stream.filter(e -> !prevLastPathElementSet.contains(e))
				.collect(Collectors.toList());
		if (isParallel)
			stream = intermediate.parallelStream();
		else
			stream = intermediate.stream();
	} else
		throw new UnsupportedOperationException();

	// Step Update
	final Class[] args = {};
	final Step step = new Step(this.getClass().getName(), "simplePath", args);
	stepList.add(step);
	return this;
}
 
源代码20 项目: hortonmachine   文件: StreamUtils.java
public static <T> Stream<T> fromListParallel( List<T> list ) {
    return list.parallelStream();
}