( )源码实例Demo

下面列出了 ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-async-util   文件:
private static <T, A, R> CompletionStage<R> collectImpl(
    final Iterator<? extends CompletionStage<T>> it,
    final Collector<? super T, A, R> collector) {

  CompletionStage<A> acc = StageSupport.completedStage(collector.supplier().get());
  final BiConsumer<A, ? super T> accFun = collector.accumulator();

  while (it.hasNext()) {
     * each additional combination step runs only after all previous steps have completed
    acc = acc.thenCombine(, (a, t) -> {
      accFun.accept(a, t);
      return a;
  return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
      ? (CompletionStage<R>) acc
      : acc.thenApply(collector.finisher());

private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
  C container = collector.supplier().get();
  BiConsumer<C, Row> accumulator = collector.accumulator();
  Function<C, R> finisher = collector.finisher();
  return queryStream(statement)
    .flatMap(cassandraRowStream -> {
      Promise<R> resultPromise = Promise.promise();
      cassandraRowStream.endHandler(end -> {
        R result = finisher.apply(container);
      cassandraRowStream.handler(row -> {
        accumulator.accept(container, row);
      return resultPromise.future();
源代码3 项目: streamex   文件:
 * Perform a partial mutable reduction using the supplied {@link Collector}
 * on a series of adjacent elements.
 * <p>
 * This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
 * partial reduction operation.
 * @param <R> the type of the elements in the resulting stream
 * @param <A> the intermediate accumulation type of the {@code Collector}
 * @param collapsible a non-interfering, stateless predicate to apply to the
 *        pair of adjacent elements of the input stream which returns true
 *        for elements which should be collected together.
 * @param collector a {@code Collector} which is used to combine the
 *        adjacent elements.
 * @return the new stream
 * @since 0.3.6
public <R, A> StreamEx<R> collapse(BiPredicate<? super T, ? super T> collapsible,
        Collector<? super T, A, R> collector) {
    Supplier<A> supplier = collector.supplier();
    BiConsumer<A, ? super T> accumulator = collector.accumulator();
    StreamEx<A> stream = collapseInternal(collapsible, t -> {
        A acc = supplier.get();
        accumulator.accept(acc, t);
        return acc;
    }, (acc, t) -> {
        accumulator.accept(acc, t);
        return acc;
    }, collector.combiner());
    if (collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
        StreamEx<R> result = (StreamEx<R>) stream;
        return result;
源代码4 项目: crate   文件:
public IncrementalPageBucketReceiver(Collector<Row, T, Iterable<Row>> collector,
                                     RowConsumer rowConsumer,
                                     Executor executor,
                                     Streamer<?>[] streamers,
                                     int upstreamsCount) {
    this.state = collector.supplier().get();
    this.accumulator = collector.accumulator();
    this.finisher = collector.finisher();
    this.executor = executor;
    this.streamers = streamers;
    this.remainingUpstreams = new AtomicInteger(upstreamsCount);
    lazyBatchIterator = CollectingBatchIterator.newInstance(
        () -> {},
        t -> {},
        () -> processingFuture,
    rowConsumer.accept(lazyBatchIterator, null);
源代码5 项目: catnip   文件:
public <A, R> R collect(@Nonnull final Collector<? super T, A, R> collector) {
    final A a = collector.supplier().get();
    final BiConsumer<A, ? super T> accumulator = collector.accumulator();
    try {
        for(final T element : map.values()) {
            accumulator.accept(a, element);
        return collector.finisher().apply(a);
    } finally {
源代码6 项目: catnip   文件:
public <A, R> R collect(@Nonnull final Collector<? super T, A, R> collector) {
    final A a = collector.supplier().get();
    final BiConsumer<A, ? super T> accumulator = collector.accumulator();
    forEach(element -> accumulator.accept(a, element));
    return collector.finisher().apply(a);
源代码7 项目: j4ts   文件:
public <R, A> R collect(Collector<? super T, A, R> collector) {
    A container = collector.supplier().get();
    BiConsumer accumulator = collector.accumulator();
    chain(new StreamRowMap(new ConsumingFunction(item -> accumulator.accept(container, item))));
    return (R) container;
源代码8 项目: openCypher   文件:
public <P, A, R, T, EX extends Exception> T transform(
        ProductionTransformation<P, R, EX> transformation, P param, Collector<R, A, T> collector ) throws EX
    BiConsumer<A, R> accumulator = collector.accumulator();
    A result = collector.supplier().get();
    for ( ProductionNode production : productions.values() )
        accumulator.accept( result, production.transform( transformation, param ) );
    return collector.finisher().apply( result );
源代码9 项目: reactor-core   文件:
public void scanStreamCollectorSubscriber() {
			actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
	Collector<String, ?, List<String>> collector = Collectors.toList();
	BiConsumer<Integer, String> accumulator = (BiConsumer<Integer, String>) collector.accumulator();
	Function<Integer, List<String>> finisher = (Function<Integer, List<String>>) collector.finisher();

	MonoStreamCollector.StreamCollectorSubscriber<String, Integer, List<String>> test = new MonoStreamCollector.StreamCollectorSubscriber<>(
			actual, 1, accumulator, finisher);
	Subscription parent = Operators.emptySubscription();




	test.onError(new IllegalStateException("boom"));

源代码10 项目: streamex   文件:
 * {@inheritDoc}
 * <p>
 * If special <a
 * href="package-summary.html#ShortCircuitReduction">short-circuiting
 * collector</a> is passed, this operation becomes short-circuiting as well.
public <R, A> R collect(Collector<? super T, A, R> collector) {
    Predicate<A> finished = finished(collector);
    if (finished != null) {
        BiConsumer<A, ? super T> acc = collector.accumulator();
        BinaryOperator<A> combiner = collector.combiner();
        Spliterator<T> spliterator = spliterator();
        if (!isParallel()) {
            A a = collector.supplier().get();
            if (!finished.test(a)) {
                try {
                    // forEachRemaining can be much faster
                    // and take much less memory than tryAdvance for certain
                    // spliterators
                    spliterator.forEachRemaining(e -> {
                        acc.accept(a, e);
                        if (finished.test(a))
                            throw new CancelException();
                } catch (CancelException ex) {
                    // ignore
            return collector.finisher().apply(a);
        Spliterator<A> spltr;
        if (!spliterator.hasCharacteristics(Spliterator.ORDERED)
            || collector.characteristics().contains(Characteristics.UNORDERED)) {
            spltr = new UnorderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner,
        } else {
            spltr = new OrderedCancellableSpliterator<>(spliterator, collector.supplier(), acc, combiner, finished);
        return collector.finisher().apply(
            new StreamEx<>(, true), context).findFirst().get());
    return rawCollect(collector);
源代码11 项目: streamex   文件:
 * Adapts a {@code Collector} accepting elements of type {@code U} to one
 * accepting elements of type {@code T} by applying a flat mapping function
 * to each input element before accumulation. The flat mapping function maps
 * an input element to a {@link Stream stream} covering zero or more output
 * elements that are then accumulated downstream. Each mapped stream is
 * {@link closed} after its contents
 * have been placed downstream. (If a mapped stream is {@code null} an empty
 * stream is used, instead.)
 * <p>
 * This method is similar to {@code Collectors.flatMapping} method which
 * appears in JDK 9. However when downstream collector is
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting</a>
 * , this method will also return a short-circuiting collector.
 * @param <T> the type of the input elements
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements, which
 *        returns a stream of results
 * @param downstream a collector which will receive the elements of the
 *        stream returned by mapper
 * @return a collector which applies the mapping function to the input
 *         elements and provides the flat mapped results to the downstream
 *         collector
 * @throws NullPointerException if mapper is null, or downstream is null.
 * @since 0.4.1
public static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
        Collector<? super U, A, R> downstream) {
    BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
    Predicate<A> finished = finished(downstream);
    if (finished != null) {
        return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
            if (finished.test(acc))
            try (Stream<? extends U> stream = mapper.apply(t)) {
                if (stream != null) {
                    stream.spliterator().forEachRemaining(u -> {
                        downstreamAccumulator.accept(acc, u);
                        if (finished.test(acc))
                            throw new CancelException();
            } catch (CancelException ex) {
                // ignore
        }, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
    return Collector.of(downstream.supplier(), (acc, t) -> {
        try (Stream<? extends U> stream = mapper.apply(t)) {
            if (stream != null) {
                stream.spliterator().forEachRemaining(u -> downstreamAccumulator.accept(acc, u));
    }, downstream.combiner(), downstream.finisher(), downstream.characteristics().toArray(new Characteristics[0]));
源代码12 项目: streamex   文件:
 * Merge series of adjacent stream entries with equal keys combining the
 * corresponding values using the provided {@code Collector}.
 * <p>
 * This is a <a href="package-summary.html#StreamOps">quasi-intermediate</a>
 * partial reduction operation.
 * <p>
 * The key of the resulting entry is the key of the first merged entry.
 * @param <R> the type of the values in the resulting stream
 * @param <A> the intermediate accumulation type of the {@code Collector}
 * @param collector a {@code Collector} which is used to combine the values
 *        of the adjacent entries with the equal keys.
 * @return a new {@code EntryStream} which keys are the keys of the original
 *         stream and the values are values of the adjacent entries with the
 *         same keys, combined using the provided collector.
 * @see StreamEx#collapse(BiPredicate, Collector)
 * @since 0.5.5
public <A, R> EntryStream<K, R> collapseKeys(Collector<? super V, A, R> collector) {
    Supplier<A> supplier = collector.supplier();
    BiConsumer<A, ? super V> accumulator = collector.accumulator();
    BinaryOperator<A> combiner = collector.combiner();
    Function<A, R> finisher = collector.finisher();
    return new StreamEx<>(new CollapseSpliterator<>(equalKeys(), e -> {
        A a = supplier.get();
        accumulator.accept(a, e.getValue());
        return new PairBox<>(e.getKey(), a);
    }, (pb, e) -> {
        accumulator.accept(pb.b, e.getValue());
        return pb;
    }, (pb1, pb2) -> {
        pb1.b = combiner.apply(pb1.b, pb2.b);
        return pb1;
    }, spliterator()), context).mapToEntry(pb -> pb.a, pb -> finisher.apply(pb.b));
源代码13 项目: crate   文件:
public MultiActionListener(int numResponses, Collector<I, S, R> collector, ActionListener<? super R> listener) {
    this(numResponses, collector.supplier(), collector.accumulator(), collector.finisher(), listener);
源代码14 项目: java-async-util   文件:
 * Performs a mutable reduction operation using collector and return a CompletionStage of the
 * result.
 * <p>
 * This is a <i>terminal method</i>.
 * @param collector a {@link Collector} which will sequentially collect the contents of this
 *        iterator into an {@code R}
 * @param <A> The intermediate type of the accumulated object
 * @param <R> The final type of the accumulated object
 * @return a {@link CompletionStage} which will complete with the collected value
 * @see Stream#collect(Collector)
default <R, A> CompletionStage<R> collect(final Collector<? super T, A, R> collector) {
  final A container = collector.supplier().get();
  final BiConsumer<A, ? super T> acc = collector.accumulator();
  return forEach(t -> acc.accept(container, t))
      .thenApply(ig -> AsyncIterators.finishContainer(container, collector));
源代码15 项目: streamex   文件:
 * Adapts a {@link Collector} accepting elements of type {@code U} to a
 * {@code LongCollector} by applying a mapping function to each input
 * element before accumulation.
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
static <U, A, R> LongCollector<?, R> mappingToObj(LongFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new LongCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
                ((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
    return Box.partialCollector(downstream).asLong((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
源代码16 项目: streamex   文件:
 * Adapts a {@link Collector} accepting elements of type {@code U} to an
 * {@code IntCollector} by applying a mapping function to each input element
 * before accumulation.
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
static <U, A, R> IntCollector<?, R> mappingToObj(IntFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new IntCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator.accept(acc, mapper.apply(i)),
                ((MergingCollector<U, A, R>) downstream).merger(), downstream.finisher(), downstream
    return Box.partialCollector(downstream).asInt((box, i) -> accumulator.accept(box.a, mapper.apply(i)));
源代码17 项目: streamex   文件:
 * Returns a {@code Collector} which partitions the input elements according
 * to a {@code Predicate}, reduces the values in each partition according to
 * another {@code Collector}, and organizes them into a
 * {@code Map<Boolean, D>} whose values are the result of the downstream
 * reduction.
 * <p>
 * Unlike {@link Collectors#partitioningBy(Predicate, Collector)} this
 * method returns a
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting
 * collector</a> if the downstream collector is short-circuiting.
 * @param <T> the type of the input elements
 * @param <A> the intermediate accumulation type of the downstream collector
 * @param <D> the result type of the downstream reduction
 * @param predicate a predicate used for classifying input elements
 * @param downstream a {@code Collector} implementing the downstream
 *        reduction
 * @return a {@code Collector} implementing the cascaded partitioning
 *         operation
 * @throws NullPointerException if predicate is null, or downstream is null.
 * @since 0.4.0
 * @see Collectors#partitioningBy(Predicate, Collector)
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
        Collector<? super T, A, D> downstream) {
    Predicate<A> finished = finished(downstream);
    if (finished != null) {
        BiConsumer<A, ? super T> accumulator = downstream.accumulator();
        return BooleanMap.partialCollector(downstream).asCancellable((map, t) -> accumulator.accept(predicate.test(
            t) ? map.trueValue : map.falseValue, t), map -> finished.test(map.trueValue) && finished.test(
    return Collectors.partitioningBy(predicate, downstream);
源代码18 项目: streamex   文件:
 * Adapts a {@code Collector} accepting elements of type {@code U} to one
 * accepting elements of type {@code T} by applying a mapping function to
 * each input element before accumulation.
 * <p>
 * Unlike {@link Collectors#mapping(Function, Collector)} this method
 * returns a
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting
 * collector</a> if the downstream collector is short-circuiting.
 * @param <T> the type of the input elements
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
 * @throws NullPointerException if mapper is null, or downstream is null.
 * @see Collectors#mapping(Function, Collector)
 * @since 0.4.0
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
        Collector<? super U, A, R> downstream) {
    Predicate<A> finished = finished(downstream);
    if (finished != null) {
        BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
            if (!finished.test(acc))
                downstreamAccumulator.accept(acc, mapper.apply(t));
        }, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
    return Collectors.mapping(mapper, downstream);
源代码19 项目: streamex   文件:
 * Returns a {@code Collector} which performs downstream reduction if all
 * elements satisfy the {@code Predicate}. The result is described as an
 * {@code Optional<R>}.
 * <p>
 * The resulting collector returns an empty optional if at least one input
 * element does not satisfy the predicate. Otherwise it returns an optional
 * which contains the result of the downstream collector.
 * <p>
 * This method returns a
 * <a href="package-summary.html#ShortCircuitReduction">short-circuiting
 * collector</a>: it may not process all the elements if some of items don't
 * satisfy the predicate or if downstream collector is a short-circuiting
 * collector.
 * <p>
 * It's guaranteed that the downstream collector is not called for elements
 * which don't satisfy the predicate.
 * @param <T> the type of input elements
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of the downstream collector
 * @param predicate a non-interfering, stateless predicate to checks whether
 *        collector should proceed with element
 * @param downstream a {@code Collector} implementing the downstream
 *        reduction
 * @return a {@code Collector} witch performs downstream reduction if all
 *         elements satisfy the predicate
 * @throws NullPointerException if mapper is null.
 * @see Stream#allMatch(Predicate)
 * @see AbstractStreamEx#dropWhile(Predicate)
 * @see AbstractStreamEx#takeWhile(Predicate)
 * @since 0.6.3
public static <T, A, R> Collector<T, ?, Optional<R>> ifAllMatch(Predicate<T> predicate,
        Collector<T, A, R> downstream) {
    Predicate<A> finished = finished(downstream);
    Supplier<A> supplier = downstream.supplier();
    BiConsumer<A, T> accumulator = downstream.accumulator();
    BinaryOperator<A> combiner = downstream.combiner();
    return new CancellableCollectorImpl<>(
            () -> new PairBox<>(supplier.get(), Boolean.TRUE),
            (acc, t) -> {
                if (acc.b && predicate.test(t)) {
                    accumulator.accept(acc.a, t);
                } else {
                    acc.b = Boolean.FALSE;
            (acc1, acc2) -> {
                if (acc1.b && acc2.b) {
                    acc1.a = combiner.apply(acc1.a, acc2.a);
                } else {
                    acc1.b = Boolean.FALSE;
                return acc1;
            acc -> acc.b ? Optional.of(downstream.finisher().apply(acc.a)) : Optional.empty(),
            finished == null ? acc -> !acc.b : acc -> !acc.b || finished.test(acc.a),
            downstream.characteristics().contains(Characteristics.UNORDERED) ? UNORDERED_CHARACTERISTICS
                    : NO_CHARACTERISTICS);
源代码20 项目: streamex   文件:
 * Adapts a {@link Collector} accepting elements of type {@code U} to a
 * {@code DoubleCollector} by applying a mapping function to each input
 * element before accumulation.
 * @param <U> type of elements accepted by downstream collector
 * @param <A> intermediate accumulation type of the downstream collector
 * @param <R> result type of collector
 * @param mapper a function to be applied to the input elements
 * @param downstream a collector which will accept mapped values
 * @return a collector which applies the mapping function to the input
 *         elements and provides the mapped results to the downstream
 *         collector
static <U, A, R> DoubleCollector<?, R> mappingToObj(DoubleFunction<U> mapper, Collector<U, A, R> downstream) {
    BiConsumer<A, U> accumulator = downstream.accumulator();
    if (downstream instanceof MergingCollector) {
        return new DoubleCollectorImpl<>(downstream.supplier(), (acc, i) -> accumulator
                .accept(acc, mapper.apply(i)), ((MergingCollector<U, A, R>) downstream).merger(), downstream
                .finisher(), downstream.characteristics());
    return Box.partialCollector(downstream).asDouble((box, i) -> accumulator.accept(box.a, mapper.apply(i)));