下面列出了java.util.stream.Collector#accumulator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
private static <T, A, R> CompletionStage<R> collectImpl(
final Iterator<? extends CompletionStage<T>> it,
final Collector<? super T, A, R> collector) {
CompletionStage<A> acc = StageSupport.completedStage(collector.supplier().get());
final BiConsumer<A, ? super T> accFun = collector.accumulator();
while (it.hasNext()) {
/*
* each additional combination step runs only after all previous steps have completed
*/
acc = acc.thenCombine(it.next(), (a, t) -> {
accFun.accept(a, t);
return a;
});
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (CompletionStage<R>) acc
: acc.thenApply(collector.finisher());
}
private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
C container = collector.supplier().get();
BiConsumer<C, Row> accumulator = collector.accumulator();
Function<C, R> finisher = collector.finisher();
return queryStream(statement)
.flatMap(cassandraRowStream -> {
Promise<R> resultPromise = Promise.promise();
cassandraRowStream.endHandler(end -> {
R result = finisher.apply(container);
resultPromise.complete(result);
});
cassandraRowStream.handler(row -> {
accumulator.accept(container, row);
});
cassandraRowStream.exceptionHandler(resultPromise::fail);
return resultPromise.future();
});
}
/**
* Perform a partial mutable reduction using the supplied {@link Collector}
* on a series of adjacent elements.
*
* <p>
* This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
* partial reduction operation.
*
* @param <R> the type of the elements in the resulting stream
* @param <A> the intermediate accumulation type of the {@code Collector}
* @param collapsible a non-interfering, stateless predicate to apply to the
* pair of adjacent elements of the input stream which returns true
* for elements which should be collected together.
* @param collector a {@code Collector} which is used to combine the
* adjacent elements.
* @return the new stream
* @since 0.3.6
*/
public <R, A> StreamEx<R> collapse(BiPredicate<? super T, ? super T> collapsible,
Collector<? super T, A, R> collector) {
Supplier<A> supplier = collector.supplier();
BiConsumer<A, ? super T> accumulator = collector.accumulator();
StreamEx<A> stream = collapseInternal(collapsible, t -> {
A acc = supplier.get();
accumulator.accept(acc, t);
return acc;
}, (acc, t) -> {
accumulator.accept(acc, t);
return acc;
}, collector.combiner());
if (collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
@SuppressWarnings("unchecked")
StreamEx<R> result = (StreamEx<R>) stream;
return result;
}
return stream.map(collector.finisher());
}
public IncrementalPageBucketReceiver(Collector<Row, T, Iterable<Row>> collector,
RowConsumer rowConsumer,
Executor executor,
Streamer<?>[] streamers,
int upstreamsCount) {
this.state = collector.supplier().get();
this.accumulator = collector.accumulator();
this.finisher = collector.finisher();
this.executor = executor;
this.streamers = streamers;
this.remainingUpstreams = new AtomicInteger(upstreamsCount);
lazyBatchIterator = CollectingBatchIterator.newInstance(
() -> {},
t -> {},
() -> processingFuture,
true);
rowConsumer.accept(lazyBatchIterator, null);
}
@Nonnull
@Override
public <A, R> R collect(@Nonnull final Collector<? super T, A, R> collector) {
final A a = collector.supplier().get();
final BiConsumer<A, ? super T> accumulator = collector.accumulator();
lock.readLock().lock();
try {
for(final T element : map.values()) {
accumulator.accept(a, element);
}
return collector.finisher().apply(a);
} finally {
lock.readLock().unlock();
}
}
@Nonnull
@Override
public <A, R> R collect(@Nonnull final Collector<? super T, A, R> collector) {
final A a = collector.supplier().get();
final BiConsumer<A, ? super T> accumulator = collector.accumulator();
forEach(element -> accumulator.accept(a, element));
return collector.finisher().apply(a);
}
@SuppressWarnings("unchecked")
public <R, A> R collect(Collector<? super T, A, R> collector) {
A container = collector.supplier().get();
BiConsumer accumulator = collector.accumulator();
chain(new StreamRowMap(new ConsumingFunction(item -> accumulator.accept(container, item))));
play();
return (R) container;
}
@Override
public <P, A, R, T, EX extends Exception> T transform(
ProductionTransformation<P, R, EX> transformation, P param, Collector<R, A, T> collector ) throws EX
{
BiConsumer<A, R> accumulator = collector.accumulator();
A result = collector.supplier().get();
for ( ProductionNode production : productions.values() )
{
accumulator.accept( result, production.transform( transformation, param ) );
}
return collector.finisher().apply( result );
}
@Test
public void scanStreamCollectorSubscriber() {
CoreSubscriber<List<String>>
actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
Collector<String, ?, List<String>> collector = Collectors.toList();
@SuppressWarnings("unchecked")
BiConsumer<Integer, String> accumulator = (BiConsumer<Integer, String>) collector.accumulator();
@SuppressWarnings("unchecked")
Function<Integer, List<String>> finisher = (Function<Integer, List<String>>) collector.finisher();
MonoStreamCollector.StreamCollectorSubscriber<String, Integer, List<String>> test = new MonoStreamCollector.StreamCollectorSubscriber<>(
actual, 1, accumulator, finisher);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
test.onError(new IllegalStateException("boom"));
assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
/**
* {@inheritDoc}
*
* <p>
* If special <a
* href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a> is passed, this operation becomes short-circuiting as well.
*/
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
Predicate<A> finished = finished(collector);
if (finished != null) {
BiConsumer<A, ? super T> acc = collector.accumulator();
BinaryOperator<A> combiner = collector.combiner();
Spliterator<T> spliterator = spliterator();
if (!isParallel()) {
A a = collector.supplier().get();
if (!finished.test(a)) {
try {
// forEachRemaining can be much faster
// and take much less memory than tryAdvance for certain
// spliterators
spliterator.forEachRemaining(e -> {
acc.accept(a, e);
if (finished.test(a))
throw new CancelException();
});
} catch (CancelException ex) {
// ignore
}
}
return collector.finisher().apply(a);
}
Spliterator<A> spltr;
if (!spliterator.hasCharacteristics(Spliterator.ORDERED)
|| collector.characteristics().contains(Characteristics.UNORDERED)) {
spltr = new UnorderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner,
finished);
} else {
spltr = new OrderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
}
return collector.finisher().apply(
new StreamEx<>(StreamSupport.stream(spltr, true), context).findFirst().get());
}
return rawCollect(collector);
}
/**
* Adapts a {@code Collector} accepting elements of type {@code U} to one
* accepting elements of type {@code T} by applying a flat mapping function
* to each input element before accumulation. The flat mapping function maps
* an input element to a {@link Stream stream} covering zero or more output
* elements that are then accumulated downstream. Each mapped stream is
* {@link java.util.stream.BaseStream#close() closed} after its contents
* have been placed downstream. (If a mapped stream is {@code null} an empty
* stream is used, instead.)
*
* <p>
* This method is similar to {@code Collectors.flatMapping} method which
* appears in JDK 9. However when downstream collector is
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting</a>
* , this method will also return a short-circuiting collector.
*
* @param <T> the type of the input elements
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements, which
* returns a stream of results
* @param downstream a collector which will receive the elements of the
* stream returned by mapper
* @return a collector which applies the mapping function to the input
* elements and provides the flat mapped results to the downstream
* collector
* @throws NullPointerException if mapper is null, or downstream is null.
* @since 0.4.1
*/
public static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
Collector<? super U, A, R> downstream) {
Objects.requireNonNull(mapper);
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
Predicate<A> finished = finished(downstream);
if (finished != null) {
return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
if (finished.test(acc))
return;
try (Stream<? extends U> stream = mapper.apply(t)) {
if (stream != null) {
stream.spliterator().forEachRemaining(u -> {
downstreamAccumulator.accept(acc, u);
if (finished.test(acc))
throw new CancelException();
});
}
} catch (CancelException ex) {
// ignore
}
}, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
}
return Collector.of(downstream.supplier(), (acc, t) -> {
try (Stream<? extends U> stream = mapper.apply(t)) {
if (stream != null) {
stream.spliterator().forEachRemaining(u -> downstreamAccumulator.accept(acc, u));
}
}
}, downstream.combiner(), downstream.finisher(), downstream.characteristics().toArray(new Characteristics[0]));
}
/**
* Merge series of adjacent stream entries with equal keys combining the
* corresponding values using the provided {@code Collector}.
*
* <p>
* This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
* partial reduction operation.
*
* <p>
* The key of the resulting entry is the key of the first merged entry.
*
* @param <R> the type of the values in the resulting stream
* @param <A> the intermediate accumulation type of the {@code Collector}
* @param collector a {@code Collector} which is used to combine the values
* of the adjacent entries with the equal keys.
* @return a new {@code EntryStream} which keys are the keys of the original
* stream and the values are values of the adjacent entries with the
* same keys, combined using the provided collector.
* @see StreamEx#collapse(BiPredicate, Collector)
* @since 0.5.5
*/
public <A, R> EntryStream<K, R> collapseKeys(Collector<? super V, A, R> collector) {
Supplier<A> supplier = collector.supplier();
BiConsumer<A, ? super V> accumulator = collector.accumulator();
BinaryOperator<A> combiner = collector.combiner();
Function<A, R> finisher = collector.finisher();
return new StreamEx<>(new CollapseSpliterator<>(equalKeys(), e -> {
A a = supplier.get();
accumulator.accept(a, e.getValue());
return new PairBox<>(e.getKey(), a);
}, (pb, e) -> {
accumulator.accept(pb.b, e.getValue());
return pb;
}, (pb1, pb2) -> {
pb1.b = combiner.apply(pb1.b, pb2.b);
return pb1;
}, spliterator()), context).mapToEntry(pb -> pb.a, pb -> finisher.apply(pb.b));
}
public MultiActionListener(int numResponses, Collector<I, S, R> collector, ActionListener<? super R> listener) {
this(numResponses, collector.supplier(), collector.accumulator(), collector.finisher(), listener);
}
/**
* Performs a mutable reduction operation using collector and return a CompletionStage of the
* result.
*
* <p>
* This is a <i>terminal method</i>.
*
* @param collector a {@link Collector} which will sequentially collect the contents of this
* iterator into an {@code R}
* @param <A> The intermediate type of the accumulated object
* @param <R> The final type of the accumulated object
* @return a {@link CompletionStage} which will complete with the collected value
* @see Stream#collect(Collector)
*/
default <R, A> CompletionStage<R> collect(final Collector<? super T, A, R> collector) {
final A container = collector.supplier().get();
final BiConsumer<A, ? super T> acc = collector.accumulator();
return forEach(t -> acc.accept(container, t))
.thenApply(ig -> AsyncIterators.finishContainer(container, collector));
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to a
* {@code LongCollector} by applying a mapping function to each input
* element before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> LongCollector<?, R> mappingToObj(LongFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new LongCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
.characteristics());
}
return Box.partialCollector(downstream).asLong((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to an
* {@code IntCollector} by applying a mapping function to each input element
* before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> IntCollector<?, R> mappingToObj(IntFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new IntCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
.characteristics());
}
return Box.partialCollector(downstream).asInt((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
/**
* Returns a {@code Collector} which partitions the input elements according
* to a {@code Predicate}, reduces the values in each partition according to
* another {@code Collector}, and organizes them into a
* {@code Map<Boolean, D>} whose values are the result of the downstream
* reduction.
*
* <p>
* Unlike {@link Collectors#partitioningBy(Predicate, Collector)} this
* method returns a
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a> if the downstream collector is short-circuiting.
*
* @param <T> the type of the input elements
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
* @param predicate a predicate used for classifying input elements
* @param downstream a {@code Collector} implementing the downstream
* reduction
* @return a {@code Collector} implementing the cascaded partitioning
* operation
* @throws NullPointerException if predicate is null, or downstream is null.
* @since 0.4.0
* @see Collectors#partitioningBy(Predicate, Collector)
*/
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
Collector<? super T, A, D> downstream) {
Objects.requireNonNull(predicate);
Predicate<A> finished = finished(downstream);
if (finished != null) {
BiConsumer<A, ? super T> accumulator = downstream.accumulator();
return BooleanMap.partialCollector(downstream).asCancellable((map, t) -> accumulator.accept(predicate.test(
t) ? map.trueValue : map.falseValue, t), map -> finished.test(map.trueValue) && finished.test(
map.falseValue));
}
return Collectors.partitioningBy(predicate, downstream);
}
/**
* Adapts a {@code Collector} accepting elements of type {@code U} to one
* accepting elements of type {@code T} by applying a mapping function to
* each input element before accumulation.
*
* <p>
* Unlike {@link Collectors#mapping(Function, Collector)} this method
* returns a
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a> if the downstream collector is short-circuiting.
*
* @param <T> the type of the input elements
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
* @throws NullPointerException if mapper is null, or downstream is null.
* @see Collectors#mapping(Function, Collector)
* @since 0.4.0
*/
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream) {
Objects.requireNonNull(mapper);
Predicate<A> finished = finished(downstream);
if (finished != null) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
if (!finished.test(acc))
downstreamAccumulator.accept(acc, mapper.apply(t));
}, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
}
return Collectors.mapping(mapper, downstream);
}
/**
* Returns a {@code Collector} which performs downstream reduction if all
* elements satisfy the {@code Predicate}. The result is described as an
* {@code Optional<R>}.
*
* <p>
* The resulting collector returns an empty optional if at least one input
* element does not satisfy the predicate. Otherwise it returns an optional
* which contains the result of the downstream collector.
*
* <p>
* This method returns a
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a>: it may not process all the elements if some of items don't
* satisfy the predicate or if downstream collector is a short-circuiting
* collector.
*
* <p>
* It's guaranteed that the downstream collector is not called for elements
* which don't satisfy the predicate.
*
* @param <T> the type of input elements
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of the downstream collector
* @param predicate a non-interfering, stateless predicate to checks whether
* collector should proceed with element
* @param downstream a {@code Collector} implementing the downstream
* reduction
* @return a {@code Collector} witch performs downstream reduction if all
* elements satisfy the predicate
* @throws NullPointerException if mapper is null.
* @see Stream#allMatch(Predicate)
* @see AbstractStreamEx#dropWhile(Predicate)
* @see AbstractStreamEx#takeWhile(Predicate)
* @since 0.6.3
*/
public static <T, A, R> Collector<T, ?, Optional<R>> ifAllMatch(Predicate<T> predicate,
Collector<T, A, R> downstream) {
Objects.requireNonNull(predicate);
Predicate<A> finished = finished(downstream);
Supplier<A> supplier = downstream.supplier();
BiConsumer<A, T> accumulator = downstream.accumulator();
BinaryOperator<A> combiner = downstream.combiner();
return new CancellableCollectorImpl<>(
() -> new PairBox<>(supplier.get(), Boolean.TRUE),
(acc, t) -> {
if (acc.b && predicate.test(t)) {
accumulator.accept(acc.a, t);
} else {
acc.b = Boolean.FALSE;
}
},
(acc1, acc2) -> {
if (acc1.b && acc2.b) {
acc1.a = combiner.apply(acc1.a, acc2.a);
} else {
acc1.b = Boolean.FALSE;
}
return acc1;
},
acc -> acc.b ? Optional.of(downstream.finisher().apply(acc.a)) : Optional.empty(),
finished == null ? acc -> !acc.b : acc -> !acc.b || finished.test(acc.a),
downstream.characteristics().contains(Characteristics.UNORDERED) ? UNORDERED_CHARACTERISTICS
: NO_CHARACTERISTICS);
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to a
* {@code DoubleCollector} by applying a mapping function to each input
* element before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> DoubleCollector<?, R> mappingToObj(DoubleFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new DoubleCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator
.accept(acc, mapper.apply(i)), ((MergingCollector<U, A, R>) downstream).merger(), downstream
.finisher(), downstream.characteristics());
}
return Box.partialCollector(downstream).asDouble((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}