io.reactivex.Flowable#subscribeOn ( )源码实例Demo

下面列出了io.reactivex.Flowable#subscribeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: reactive-streams-in-java   文件: RxJavaDemo.java
public static void runComputation() {
    Flowable<String> source = Flowable.fromCallable(() -> { //1
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    });
    source.doOnComplete(() -> System.out.println("Completed runComputation"));

    Flowable<String> background = source.subscribeOn(Schedulers.io()); //2

    Flowable<String> foreground = background.observeOn(Schedulers.single());//3

    foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
 
源代码2 项目: rxjava2-extras   文件: FlowableMatchTest.java
private void testShifted(int n, boolean async) {
    Flowable<Integer> a = Flowable.just(0).concatWith(Flowable.range(1, n));
    if (async) {
        a = a.subscribeOn(Schedulers.computation());
    }
    Flowable<Integer> b = Flowable.range(1, n);
    assertTrue(Flowable.sequenceEqual(matchThem(a, b), Flowable.range(1, n)).blockingGet());
}
 
源代码3 项目: cxf   文件: FlowableRxInvokerImpl.java
private <T> Flowable<T> create(Supplier<T> supplier) {
    Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                T response = supplier.get();
                if (!emitter.isCancelled()) {
                    emitter.onNext(response);
                }
                
                if (!emitter.isCancelled()) {
                    emitter.onComplete();
                }
            } catch (Throwable e) {
                if (!emitter.isCancelled()) {
                    emitter.onError(e);
                }
            }
        }
    }, BackpressureStrategy.DROP);
    
    if (sc == null) {
        return flowable.subscribeOn(Schedulers.io());
    }
    
    return flowable.subscribeOn(sc).observeOn(sc);
}
 
源代码4 项目: storio   文件: RxJavaUtils.java
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
        @NonNull StorIOSQLite storIOSQLite,
        @NonNull Flowable<T> flowable
) {
    final Scheduler scheduler = storIOSQLite.defaultRxScheduler();
    return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
 
源代码5 项目: storio   文件: RxJavaUtils.java
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
        @NonNull StorIOContentResolver storIOContentResolver,
        @NonNull Flowable<T> flowable
) {
    final Scheduler scheduler = storIOContentResolver.defaultRxScheduler();
    return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
 
源代码6 项目: cyclops   文件: FlowableIO.java
public static <T> IO<T> of(Supplier<? extends T> s, Scheduler ex){
    Flowable<T> x = Flowable.fromCallable(() -> s.get());
    x = x.subscribeOn(ex);
    return new FlowableIO<T>(x);
}