下面列出了java.util.stream.Stream#parallel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
/**
*
* @param regTreeConfig
* @param probs
* @return best valid splitResult, possibly nothing
*/
static Optional<SplitResult> split(RegTreeConfig regTreeConfig,
DataSet dataSet,
double[] labels,
double[] probs,
int[] monotonicity){
GlobalStats globalStats = new GlobalStats(labels,probs);
if (logger.isDebugEnabled()){
logger.debug("global statistics = "+globalStats);
}
List<Integer> featureIndices = IntStream.range(0, dataSet.getNumFeatures()).boxed().collect(Collectors.toList());
Stream<Integer> stream = featureIndices.stream();
if (regTreeConfig.isParallel()){
stream = stream.parallel();
}
// the list might be empty
return stream.map(featureIndex -> split(regTreeConfig, dataSet, labels, probs, featureIndex, globalStats, monotonicity))
.filter(Optional::isPresent)
.map(Optional::get)
.max(Comparator.comparing(SplitResult::getReduction));
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
private List<AuthorizationSettingEntity> getUserSetting(String userId) {
Map<String, List<SettingInfo>> settingInfo =
authorizationSettingTypeSuppliers.stream()
.map(supplier -> supplier.get(userId))
.flatMap(Set::stream)
.collect(Collectors.groupingBy(SettingInfo::getType));
Stream<Map.Entry<String, List<SettingInfo>>> settingInfoStream = settingInfo.entrySet().stream();
//大于1 使用并行处理
if (settingInfo.size() > 1) {
settingInfoStream = settingInfoStream.parallel();
}
return settingInfoStream
.map(entry ->
createQuery()
// where type = ? and setting_for in (?,?,?....)
.where(type, entry.getKey())
.and()
.in(settingFor, entry.getValue().stream().map(SettingInfo::getSettingFor).collect(Collectors.toList()))
.listNoPaging())
.flatMap(List::stream)
.collect(Collectors.toList());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public Stream<T> get() {
Stream<T> res = base.get();
switch (mode) {
case NORMAL:
case SPLITERATOR:
return res.sequential();
case PARALLEL:
return res.parallel();
case APPEND:
// using Stream.empty() or Arrays.asList() here is optimized out
// in append/prepend which is undesired
return StreamEx.of(res.parallel()).append(new ConcurrentLinkedQueue<>());
case PREPEND:
return StreamEx.of(res.parallel()).prepend(new ConcurrentLinkedQueue<>());
case RANDOM:
return StreamEx.of(new EmptyingSpliterator<>(res.parallel().spliterator())).parallel();
default:
throw new InternalError("Unsupported mode: " + mode);
}
}
public void testParSeq() {
Stream<Integer> s = Arrays.asList(1, 2, 3, 4).stream().parallel();
assertTrue(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.sequential();
assertFalse(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
s = s.parallel();
assertTrue(s.isParallel());
}
public void execute(List<Runnable> runnables) throws InterruptedException {
if (runnables.isEmpty()) {
return;
}
Stream<Runnable> stream = runnables.stream();
if (threadCount > 1) {
stream = stream.parallel();
}
stream.forEach((Runnable runnable) -> {
try {
if (!hasException) {
info("excute " +
name +
" task at " +
index.incrementAndGet() +
"/" +
runnables.size());
runnable.run();
}
} catch (Throwable gradleException) {
hasException = true;
exception = gradleException;
}
});
if (hasException) {
throw new GradleException(exception.getMessage(), exception);
}
}
/**
* parallel
*/
private static void setLeavesOutputs(RegTreeConfig regTreeConfig, List<Node> leaves, LeafOutputCalculator calculator, double[] labels){
Stream<Node> stream = leaves.stream();
if (regTreeConfig.isParallel()){
stream = stream.parallel();
}
stream.forEach(leaf -> setLeafOutput(leaf, calculator, labels));
}
/**
* parallel
*/
private static void setLeavesOutputs(RegTreeConfig regTreeConfig, List<Node> leaves, LeafOutputCalculator calculator, double[] labels){
Stream<Node> stream = leaves.stream();
if (regTreeConfig.isParallel()){
stream = stream.parallel();
}
stream.forEach(leaf -> setLeafOutput(leaf, calculator, labels));
}
public static <T> Stream<T> streamP(Stream<T> source, boolean parallel)
{
if(parallel)
return source.parallel();
else
return source;
}
/**
* Optimized version for verification of several BLS signatures in a single batch. See
* https://ethresear.ch/t/fast-verification-of-multiple-bls-signatures/5407 for background
*
* <p>Parameters for verification are supplied with 3 lists which should have the same size. Each
* set consists of a message, signature (aggregate or not), and a list of signers' public keys
* (several or just a single one). See {@link #fastAggregateVerify(List, Bytes, BLSSignature)} for
* reference.
*
* <p>The standard says to return INVALID, that is, false, if the list of public keys is empty.
*
* @param doublePairing if true uses the optimized version of ate pairing (ate2) which processes a
* pair of signatures a bit faster than with 2 separate regular ate calls Note that this
* option may not be optimal when a number of signatures is relatively small and the
* [parallel] option is [true]
* @param parallel Uses the default {@link java.util.concurrent.ForkJoinPool} to parallelize the
* work
* @return True if the verification is successful, false otherwise
*/
public static boolean batchVerify(
List<List<BLSPublicKey>> publicKeys,
List<Bytes> messages,
List<BLSSignature> signatures,
boolean doublePairing,
boolean parallel) {
Preconditions.checkArgument(
publicKeys.size() == messages.size() && publicKeys.size() == signatures.size(),
"Different collection sizes");
int count = publicKeys.size();
if (count == 0) return false;
if (doublePairing) {
Stream<List<Integer>> pairsStream =
Lists.partition(IntStream.range(0, count).boxed().collect(Collectors.toList()), 2)
.stream();
if (parallel) {
pairsStream = pairsStream.parallel();
}
return completeBatchVerify(
pairsStream
.map(
idx ->
idx.size() == 1
? prepareBatchVerify(
idx.get(0),
publicKeys.get(idx.get(0)),
messages.get(idx.get(0)),
signatures.get(idx.get(0)))
: prepareBatchVerify2(
idx.get(0),
publicKeys.get(idx.get(0)),
messages.get(idx.get(0)),
signatures.get(idx.get(0)),
publicKeys.get(idx.get(1)),
messages.get(idx.get(1)),
signatures.get(idx.get(1))))
.collect(Collectors.toList()));
} else {
Stream<Integer> indexStream = IntStream.range(0, count).boxed();
if (parallel) {
indexStream = indexStream.parallel();
}
return completeBatchVerify(
indexStream
.map(
idx ->
prepareBatchVerify(
idx, publicKeys.get(idx), messages.get(idx), signatures.get(idx)))
.collect(Collectors.toList()));
}
}
@Test
public void parallelStreamClose(){
int cores = Runtime.getRuntime()
.availableProcessors();
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(cores*2));
for(int k=0; k < 100;k++) {
System.gc();
System.out.println(k);
Queue<Integer> queue = QueueFactories.<Integer>boundedQueue(5).build();
for(int z=0;z<10;z++){
queue.add(z);
}
new Thread(() -> {
while(!queue.isOpen()){
System.out.println("Queue isn't open yet!");
}
System.err.println("Closing " + queue.close());
for(int i=0;i<100;i++){
try {
Thread.sleep(100);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
queue.disconnectStreams(1000);
}
}).start();
Stream<Integer> stream = queue.jdkStream(100);
stream = stream.parallel();
stream.forEach(e ->
{
System.out.println(e);
});
System.out.println("done " + k);
}
}
public Map<AggregationKey, AggregationGroup> groupBy(Scanner scan, int[] groupedFieldsIndexes, String[] aggCallNames, String[] columnNames, List<List<Integer>> args, boolean concurrent) {
EnumSet<Collector.Characteristics> characteristics = getCharacteristics(concurrent);
Stream<DataAccessor> stream = scan.stream();
stream = concurrent ? stream.parallel() : stream;
return stream.collect(new Collector<DataAccessor, Map<AggregationKey, AggregationGroup>, Map<AggregationKey, AggregationGroup>>() {
@Override
public Supplier<Map<AggregationKey, AggregationGroup>> supplier() {
return () -> concurrent ? new ConcurrentHashMap<>() : new HashMap<>();
}
@Override
public BiConsumer<Map<AggregationKey, AggregationGroup>, DataAccessor> accumulator() {
return new BiConsumer<Map<AggregationKey, AggregationGroup>, DataAccessor>() {
private AggregationGroup createGroup(AggregationKey aggregationKey) {
return AggregationGroup.of(aggCallNames, columnNames, args);
}
@Override
public void accept(Map<AggregationKey, AggregationGroup> aggregationKeyAggregationGroupMap,
DataAccessor dataAccessor) {
AggregationKey key = AggregationKey.of(dataAccessor, groupedFieldsIndexes);
AggregationGroup aggregationGroup = aggregationKeyAggregationGroupMap.computeIfAbsent(key, this::createGroup);
for (final AggregationCallExp cc : aggregationGroup.getColumns()) {
if (concurrent) {
synchronized (cc) {
cc.accept(dataAccessor);
}
} else {
cc.accept(dataAccessor);
}
}
}
};
}
@Override
public BinaryOperator<Map<AggregationKey, AggregationGroup>> combiner() {
return (m1, m2) -> {
for (Map.Entry<AggregationKey, AggregationGroup> e : m2.entrySet())
m1.merge(e.getKey(), e.getValue(), AggregationGroup::merge);
return m1;
};
}
@Override
public Function<Map<AggregationKey, AggregationGroup>, Map<AggregationKey, AggregationGroup>> finisher() {
return aggregationKeyAggregationGroupMap -> aggregationKeyAggregationGroupMap;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
});
}