下面列出了java.util.stream.Collector#finisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
C container = collector.supplier().get();
BiConsumer<C, Row> accumulator = collector.accumulator();
Function<C, R> finisher = collector.finisher();
return queryStream(statement)
.flatMap(cassandraRowStream -> {
Promise<R> resultPromise = Promise.promise();
cassandraRowStream.endHandler(end -> {
R result = finisher.apply(container);
resultPromise.complete(result);
});
cassandraRowStream.handler(row -> {
accumulator.accept(container, row);
});
cassandraRowStream.exceptionHandler(resultPromise::fail);
return resultPromise.future();
});
}
@SuppressWarnings("unchecked")
static <K, D, A, M extends Map<K, D>> PartialCollector<Map<K, A>, M> grouping(Supplier<M> mapFactory,
Collector<?, A, D> downstream) {
BinaryOperator<A> downstreamMerger = downstream.combiner();
BiConsumer<Map<K, A>, Map<K, A>> merger = (map1, map2) -> {
for (Map.Entry<K, A> e : map2.entrySet())
map1.merge(e.getKey(), e.getValue(), downstreamMerger);
};
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return (PartialCollector<Map<K, A>, M>) new PartialCollector<>((Supplier<Map<K, A>>) mapFactory,
merger, Function.identity(), ID_CHARACTERISTICS);
}
Function<A, D> downstreamFinisher = downstream.finisher();
return new PartialCollector<>((Supplier<Map<K, A>>) mapFactory, merger, map -> {
map.replaceAll((k, v) -> ((Function<A, A>) downstreamFinisher).apply(v));
return (M) map;
}, NO_CHARACTERISTICS);
}
public IncrementalPageBucketReceiver(Collector<Row, T, Iterable<Row>> collector,
RowConsumer rowConsumer,
Executor executor,
Streamer<?>[] streamers,
int upstreamsCount) {
this.state = collector.supplier().get();
this.accumulator = collector.accumulator();
this.finisher = collector.finisher();
this.executor = executor;
this.streamers = streamers;
this.remainingUpstreams = new AtomicInteger(upstreamsCount);
lazyBatchIterator = CollectingBatchIterator.newInstance(
() -> {},
t -> {},
() -> processingFuture,
true);
rowConsumer.accept(lazyBatchIterator, null);
}
@Test
public void scanStreamCollectorSubscriber() {
CoreSubscriber<List<String>>
actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
Collector<String, ?, List<String>> collector = Collectors.toList();
@SuppressWarnings("unchecked")
BiConsumer<Integer, String> accumulator = (BiConsumer<Integer, String>) collector.accumulator();
@SuppressWarnings("unchecked")
Function<Integer, List<String>> finisher = (Function<Integer, List<String>>) collector.finisher();
MonoStreamCollector.StreamCollectorSubscriber<String, Integer, List<String>> test = new MonoStreamCollector.StreamCollectorSubscriber<>(
actual, 1, accumulator, finisher);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
test.onError(new IllegalStateException("boom"));
assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
static <A, R> PartialCollector<Box<A>, R> partialCollector(Collector<?, A, R> c) {
Supplier<A> supplier = c.supplier();
BinaryOperator<A> combiner = c.combiner();
Function<A, R> finisher = c.finisher();
return new PartialCollector<>(() -> new Box<>(supplier.get()), (box1, box2) -> box1.a = combiner.apply(
box1.a, box2.a), box -> finisher.apply(box.a), NO_CHARACTERISTICS);
}
/**
* Adapts a {@code Collector} accepting elements of type {@code U} to one
* accepting elements of type {@code T} by applying a flat mapping function
* to each input element before accumulation. The flat mapping function maps
* an input element to a {@link Stream stream} covering zero or more output
* elements that are then accumulated downstream. Each mapped stream is
* {@link java.util.stream.BaseStream#close() closed} after its contents
* have been placed downstream. (If a mapped stream is {@code null} an empty
* stream is used, instead.)
*
* <p>
* This method is similar to {@code Collectors.flatMapping} method which
* appears in JDK 9. However when downstream collector is
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting</a>
* , this method will also return a short-circuiting collector.
*
* @param <T> the type of the input elements
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements, which
* returns a stream of results
* @param downstream a collector which will receive the elements of the
* stream returned by mapper
* @return a collector which applies the mapping function to the input
* elements and provides the flat mapped results to the downstream
* collector
* @throws NullPointerException if mapper is null, or downstream is null.
* @since 0.4.1
*/
public static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
Collector<? super U, A, R> downstream) {
Objects.requireNonNull(mapper);
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
Predicate<A> finished = finished(downstream);
if (finished != null) {
return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
if (finished.test(acc))
return;
try (Stream<? extends U> stream = mapper.apply(t)) {
if (stream != null) {
stream.spliterator().forEachRemaining(u -> {
downstreamAccumulator.accept(acc, u);
if (finished.test(acc))
throw new CancelException();
});
}
} catch (CancelException ex) {
// ignore
}
}, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
}
return Collector.of(downstream.supplier(), (acc, t) -> {
try (Stream<? extends U> stream = mapper.apply(t)) {
if (stream != null) {
stream.spliterator().forEachRemaining(u -> downstreamAccumulator.accept(acc, u));
}
}
}, downstream.combiner(), downstream.finisher(), downstream.characteristics().toArray(new Characteristics[0]));
}
/**
* Merge series of adjacent stream entries with equal keys combining the
* corresponding values using the provided {@code Collector}.
*
* <p>
* This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
* partial reduction operation.
*
* <p>
* The key of the resulting entry is the key of the first merged entry.
*
* @param <R> the type of the values in the resulting stream
* @param <A> the intermediate accumulation type of the {@code Collector}
* @param collector a {@code Collector} which is used to combine the values
* of the adjacent entries with the equal keys.
* @return a new {@code EntryStream} which keys are the keys of the original
* stream and the values are values of the adjacent entries with the
* same keys, combined using the provided collector.
* @see StreamEx#collapse(BiPredicate, Collector)
* @since 0.5.5
*/
public <A, R> EntryStream<K, R> collapseKeys(Collector<? super V, A, R> collector) {
Supplier<A> supplier = collector.supplier();
BiConsumer<A, ? super V> accumulator = collector.accumulator();
BinaryOperator<A> combiner = collector.combiner();
Function<A, R> finisher = collector.finisher();
return new StreamEx<>(new CollapseSpliterator<>(equalKeys(), e -> {
A a = supplier.get();
accumulator.accept(a, e.getValue());
return new PairBox<>(e.getKey(), a);
}, (pb, e) -> {
accumulator.accept(pb.b, e.getValue());
return pb;
}, (pb1, pb2) -> {
pb1.b = combiner.apply(pb1.b, pb2.b);
return pb1;
}, spliterator()), context).mapToEntry(pb -> pb.a, pb -> finisher.apply(pb.b));
}
public MultiActionListener(int numResponses, Collector<I, S, R> collector, ActionListener<? super R> listener) {
this(numResponses, collector.supplier(), collector.accumulator(), collector.finisher(), listener);
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to a
* {@code LongCollector} by applying a mapping function to each input
* element before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> LongCollector<?, R> mappingToObj(LongFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new LongCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
.characteristics());
}
return Box.partialCollector(downstream).asLong((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to an
* {@code IntCollector} by applying a mapping function to each input element
* before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> IntCollector<?, R> mappingToObj(IntFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new IntCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
.characteristics());
}
return Box.partialCollector(downstream).asInt((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
/**
* Adapts a {@code Collector} accepting elements of type {@code U} to one
* accepting elements of type {@code T} by applying a mapping function to
* each input element before accumulation.
*
* <p>
* Unlike {@link Collectors#mapping(Function, Collector)} this method
* returns a
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a> if the downstream collector is short-circuiting.
*
* @param <T> the type of the input elements
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
* @throws NullPointerException if mapper is null, or downstream is null.
* @see Collectors#mapping(Function, Collector)
* @since 0.4.0
*/
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream) {
Objects.requireNonNull(mapper);
Predicate<A> finished = finished(downstream);
if (finished != null) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
if (!finished.test(acc))
downstreamAccumulator.accept(acc, mapper.apply(t));
}, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
}
return Collectors.mapping(mapper, downstream);
}
/**
* Adapts a {@link Collector} accepting elements of type {@code U} to a
* {@code DoubleCollector} by applying a mapping function to each input
* element before accumulation.
*
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
*/
static <U, A, R> DoubleCollector<?, R> mappingToObj(DoubleFunction<U> mapper, Collector<U, A, R> downstream) {
BiConsumer<A, U> accumulator = downstream.accumulator();
if (downstream instanceof MergingCollector) {
return new DoubleCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator
.accept(acc, mapper.apply(i)), ((MergingCollector<U, A, R>) downstream).merger(), downstream
.finisher(), downstream.characteristics());
}
return Box.partialCollector(downstream).asDouble((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}