java.util.stream.Collector#finisher ( )源码实例Demo

下面列出了java.util.stream.Collector#finisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
  C container = collector.supplier().get();
  BiConsumer<C, Row> accumulator = collector.accumulator();
  Function<C, R> finisher = collector.finisher();
  return queryStream(statement)
    .flatMap(cassandraRowStream -> {
      Promise<R> resultPromise = Promise.promise();
      cassandraRowStream.endHandler(end -> {
        R result = finisher.apply(container);
        resultPromise.complete(result);
      });
      cassandraRowStream.handler(row -> {
        accumulator.accept(container, row);
      });
      cassandraRowStream.exceptionHandler(resultPromise::fail);
      return resultPromise.future();
    });
}
 
源代码2 项目: streamex   文件: Internals.java
@SuppressWarnings("unchecked")
static <K, D, A, M extends Map<K, D>> PartialCollector<Map<K, A>, M> grouping(Supplier<M> mapFactory,
        Collector<?, A, D> downstream) {
    BinaryOperator<A> downstreamMerger = downstream.combiner();
    BiConsumer<Map<K, A>, Map<K, A>> merger = (map1, map2) -> {
        for (Map.Entry<K, A> e : map2.entrySet())
            map1.merge(e.getKey(), e.getValue(), downstreamMerger);
    };

    if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
        return (PartialCollector<Map<K, A>, M>) new PartialCollector<>((Supplier<Map<K, A>>) mapFactory,
                merger, Function.identity(), ID_CHARACTERISTICS);
    }
    Function<A, D> downstreamFinisher = downstream.finisher();
    return new PartialCollector<>((Supplier<Map<K, A>>) mapFactory, merger, map -> {
        map.replaceAll((k, v) -> ((Function<A, A>) downstreamFinisher).apply(v));
        return (M) map;
    }, NO_CHARACTERISTICS);
}
 
源代码3 项目: crate   文件: IncrementalPageBucketReceiver.java
public IncrementalPageBucketReceiver(Collector<Row, T, Iterable<Row>> collector,
                                     RowConsumer rowConsumer,
                                     Executor executor,
                                     Streamer<?>[] streamers,
                                     int upstreamsCount) {
    this.state = collector.supplier().get();
    this.accumulator = collector.accumulator();
    this.finisher = collector.finisher();
    this.executor = executor;
    this.streamers = streamers;
    this.remainingUpstreams = new AtomicInteger(upstreamsCount);
    lazyBatchIterator = CollectingBatchIterator.newInstance(
        () -> {},
        t -> {},
        () -> processingFuture,
        true);
    rowConsumer.accept(lazyBatchIterator, null);
}
 
源代码4 项目: reactor-core   文件: MonoStreamCollectorTest.java
@Test
public void scanStreamCollectorSubscriber() {
	CoreSubscriber<List<String>>
			actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
	Collector<String, ?, List<String>> collector = Collectors.toList();
	@SuppressWarnings("unchecked")
	BiConsumer<Integer, String> accumulator = (BiConsumer<Integer, String>) collector.accumulator();
	@SuppressWarnings("unchecked")
	Function<Integer, List<String>> finisher = (Function<Integer, List<String>>) collector.finisher();

	MonoStreamCollector.StreamCollectorSubscriber<String, Integer, List<String>> test = new MonoStreamCollector.StreamCollectorSubscriber<>(
			actual, 1, accumulator, finisher);
	Subscription parent = Operators.emptySubscription();

	test.onSubscribe(parent);

	assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);

	assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
	assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);

	assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
	test.onError(new IllegalStateException("boom"));
	assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();

	assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
	test.cancel();
	assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
 
源代码5 项目: streamex   文件: Internals.java
static <A, R> PartialCollector<Box<A>, R> partialCollector(Collector<?, A, R> c) {
    Supplier<A> supplier = c.supplier();
    BinaryOperator<A> combiner = c.combiner();
    Function<A, R> finisher = c.finisher();
    return new PartialCollector<>(() -> new Box<>(supplier.get()), (box1, box2) -> box1.a = combiner.apply(
        box1.a, box2.a), box -> finisher.apply(box.a), NO_CHARACTERISTICS);
}
 
源代码6 项目: streamex   文件: MoreCollectors.java
/**
 * Adapts a {@code Collector} accepting elements of type {@code U} to one
 * accepting elements of type {@code T} by applying a flat mapping function
 * to each input element before accumulation. The flat mapping function maps
 * an input element to a {@link Stream stream} covering zero or more output
 * elements that are then accumulated downstream. Each mapped stream is
 * {@link java.util.stream.BaseStream#close() closed} after its contents
 * have been placed downstream. (If a mapped stream is {@code null} an empty
 * stream is used, instead.)
 * 
 * <p>
 * This method is similar to {@code Collectors.flatMapping} method which
 * appears in JDK 9. However when downstream collector is
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting</a>
 * , this method will also return a short-circuiting collector.
 * 
 * @param <T> the type of the input elements
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements, which
 *        returns a stream of results
 * @param downstream a collector which will receive the elements of the
 *        stream returned by mapper
 * @return a collector which applies the mapping function to the input
 *         elements and provides the flat mapped results to the downstream
 *         collector
 * @throws NullPointerException if mapper is null, or downstream is null.
 * @since 0.4.1
 */
public static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
        Collector<? super U, A, R> downstream) {
    Objects.requireNonNull(mapper);
    BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
    Predicate<A> finished = finished(downstream);
    if (finished != null) {
        return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
            if (finished.test(acc))
                return;
            try (Stream<? extends U> stream = mapper.apply(t)) {
                if (stream != null) {
                    stream.spliterator().forEachRemaining(u -> {
                        downstreamAccumulator.accept(acc, u);
                        if (finished.test(acc))
                            throw new CancelException();
                    });
                }
            } catch (CancelException ex) {
                // ignore
            }
        }, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
    }
    return Collector.of(downstream.supplier(), (acc, t) -> {
        try (Stream<? extends U> stream = mapper.apply(t)) {
            if (stream != null) {
                stream.spliterator().forEachRemaining(u -> downstreamAccumulator.accept(acc, u));
            }
        }
    }, downstream.combiner(), downstream.finisher(), downstream.characteristics().toArray(new Characteristics[0]));
}
 
源代码7 项目: streamex   文件: EntryStream.java
/**
 * Merge series of adjacent stream entries with equal keys combining the
 * corresponding values using the provided {@code Collector}.
 *
 * <p>
 * This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
 * partial reduction operation.
 *
 * <p>
 * The key of the resulting entry is the key of the first merged entry.
 *
 * @param <R> the type of the values in the resulting stream
 * @param <A> the intermediate accumulation type of the {@code Collector}
 * @param collector a {@code Collector} which is used to combine the values
 *        of the adjacent entries with the equal keys.
 * @return a new {@code EntryStream} which keys are the keys of the original
 *         stream and the values are values of the adjacent entries with the
 *         same keys, combined using the provided collector.
 * @see StreamEx#collapse(BiPredicate, Collector)
 * @since 0.5.5
 */
public <A, R> EntryStream<K, R> collapseKeys(Collector<? super V, A, R> collector) {
    Supplier<A> supplier = collector.supplier();
    BiConsumer<A, ? super V> accumulator = collector.accumulator();
    BinaryOperator<A> combiner = collector.combiner();
    Function<A, R> finisher = collector.finisher();
    return new StreamEx<>(new CollapseSpliterator<>(equalKeys(), e -> {
        A a = supplier.get();
        accumulator.accept(a, e.getValue());
        return new PairBox<>(e.getKey(), a);
    }, (pb, e) -> {
        accumulator.accept(pb.b, e.getValue());
        return pb;
    }, (pb1, pb2) -> {
        pb1.b = combiner.apply(pb1.b, pb2.b);
        return pb1;
    }, spliterator()), context).mapToEntry(pb -> pb.a, pb -> finisher.apply(pb.b));
}
 
源代码8 项目: crate   文件: MultiActionListener.java
public MultiActionListener(int numResponses, Collector<I, S, R> collector, ActionListener<? super R> listener) {
    this(numResponses, collector.supplier(), collector.accumulator(), collector.finisher(), listener);
}
 
源代码9 项目: streamex   文件: LongCollector.java
/**
 * Adapts a {@link Collector} accepting elements of type {@code U} to a
 * {@code LongCollector} by applying a mapping function to each input
 * element before accumulation.
 *
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
 */
static <U, A, R> LongCollector<?, R> mappingToObj(LongFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new LongCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
                ((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
                        .characteristics());
    }
    return Box.partialCollector(downstream).asLong((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
 
源代码10 项目: streamex   文件: IntCollector.java
/**
 * Adapts a {@link Collector} accepting elements of type {@code U} to an
 * {@code IntCollector} by applying a mapping function to each input element
 * before accumulation.
 *
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
 */
static <U, A, R> IntCollector<?, R> mappingToObj(IntFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new IntCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
                ((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
                        .characteristics());
    }
    return Box.partialCollector(downstream).asInt((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}
 
源代码11 项目: streamex   文件: MoreCollectors.java
/**
 * Adapts a {@code Collector} accepting elements of type {@code U} to one
 * accepting elements of type {@code T} by applying a mapping function to
 * each input element before accumulation.
 *
 * <p>
 * Unlike {@link Collectors#mapping(Function, Collector)} this method
 * returns a
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting
 * collector</a> if the downstream collector is short-circuiting.
 * 
 * @param <T> the type of the input elements
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
 * @throws NullPointerException if mapper is null, or downstream is null.
 * @see Collectors#mapping(Function, Collector)
 * @since 0.4.0
 */
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
        Collector<? super U, A, R> downstream) {
    Objects.requireNonNull(mapper);
    Predicate<A> finished = finished(downstream);
    if (finished != null) {
        BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
            if (!finished.test(acc))
                downstreamAccumulator.accept(acc, mapper.apply(t));
        }, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
    }
    return Collectors.mapping(mapper, downstream);
}
 
源代码12 项目: streamex   文件: DoubleCollector.java
/**
 * Adapts a {@link Collector} accepting elements of type {@code U} to a
 * {@code DoubleCollector} by applying a mapping function to each input
 * element before accumulation.
 *
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
 */
static <U, A, R> DoubleCollector<?, R> mappingToObj(DoubleFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new DoubleCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator
                .accept(acc, mapper.apply(i)), ((MergingCollector<U, A, R>) downstream).merger(), downstream
                .finisher(), downstream.characteristics());
    }
    return Box.partialCollector(downstream).asDouble((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
}