io.reactivex.Single#flatMapPublisher ( )源码实例Demo

下面列出了io.reactivex.Single#flatMapPublisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> Flowable<TResponse> oneToMany(
        final Single<TRequest> rxRequest,
        final BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
        final CallOptions options) {
    try {

        final int prefetch = RxCallOptions.getPrefetch(options);
        final int lowTide = RxCallOptions.getLowTide(options);

        return rxRequest
                .flatMapPublisher(new io.reactivex.functions.Function<TRequest, Publisher<? extends TResponse>>() {
                    @Override
                    public Publisher<? extends TResponse> apply(TRequest request) {
                        final RxClientStreamObserverAndPublisher<TResponse> consumerStreamObserver =
                            new RxClientStreamObserverAndPublisher<TResponse>(null, null, prefetch, lowTide);

                        delegate.accept(request, consumerStreamObserver);

                        return consumerStreamObserver;
                    }
                });
    } catch (Throwable throwable) {
        return Flowable.error(throwable);
    }
}
 
源代码2 项目: reactive-streams-in-java   文件: RxJavaDemo.java
public static void readFile2(File file) {
    Single<BufferedReader> readerSingle = Single.just(file) //1
            .observeOn(Schedulers.io()) //2
            .map(FileReader::new)
            .map(BufferedReader::new); //3
    Flowable<String> flowable = readerSingle.flatMapPublisher(reader -> //4
            Flowable.fromIterable( //5
                    () -> Stream.generate(readLineSupplier(reader)).iterator()
            ).takeWhile(line -> !"EOF".equals(line))); //6
    flowable
            .doOnNext(it -> System.out.println("thread="
                    + Thread.currentThread().getName())) //7
            .doOnError(ex -> ex.printStackTrace())
            .blockingSubscribe(System.out::println); //8
}
 
源代码3 项目: cyclops   文件: Flowables.java
public static <T> Flowable<Single<T>> sequence(final Publisher<? extends Flowable<T>> fts) {

        io.reactivex.functions.BiFunction<Flowable<Single<T>>,Flowable<T>,Flowable<Single<T>>> combineToStream = (acc,next) ->Flowable.merge(acc,next.map(Single::just));
        Single<Flowable<Single<T>>> x = Flowable.fromPublisher(fts).reduce(Flowable.empty(), combineToStream);
        Flowable<Flowable<Single<T>>> r = x.flatMapPublisher(Flowable::just);
        return r.flatMap(i->i);
    }