下面列出了io.reactivex.Single#flatMapPublisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> Flowable<TResponse> oneToMany(
final Single<TRequest> rxRequest,
final BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
final CallOptions options) {
try {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
return rxRequest
.flatMapPublisher(new io.reactivex.functions.Function<TRequest, Publisher<? extends TResponse>>() {
@Override
public Publisher<? extends TResponse> apply(TRequest request) {
final RxClientStreamObserverAndPublisher<TResponse> consumerStreamObserver =
new RxClientStreamObserverAndPublisher<TResponse>(null, null, prefetch, lowTide);
delegate.accept(request, consumerStreamObserver);
return consumerStreamObserver;
}
});
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
}
public static void readFile2(File file) {
Single<BufferedReader> readerSingle = Single.just(file) //1
.observeOn(Schedulers.io()) //2
.map(FileReader::new)
.map(BufferedReader::new); //3
Flowable<String> flowable = readerSingle.flatMapPublisher(reader -> //4
Flowable.fromIterable( //5
() -> Stream.generate(readLineSupplier(reader)).iterator()
).takeWhile(line -> !"EOF".equals(line))); //6
flowable
.doOnNext(it -> System.out.println("thread="
+ Thread.currentThread().getName())) //7
.doOnError(ex -> ex.printStackTrace())
.blockingSubscribe(System.out::println); //8
}
public static <T> Flowable<Single<T>> sequence(final Publisher<? extends Flowable<T>> fts) {
io.reactivex.functions.BiFunction<Flowable<Single<T>>,Flowable<T>,Flowable<Single<T>>> combineToStream = (acc,next) ->Flowable.merge(acc,next.map(Single::just));
Single<Flowable<Single<T>>> x = Flowable.fromPublisher(fts).reduce(Flowable.empty(), combineToStream);
Flowable<Flowable<Single<T>>> r = x.flatMapPublisher(Flowable::just);
return r.flatMap(i->i);
}