io.reactivex.Flowable#observeOn ( )源码实例Demo

下面列出了io.reactivex.Flowable#observeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Collection-Android   文件: SchedulerTransformer.java
@Override
public Publisher<T> apply(Flowable<T> upstream) {
    switch (mSchedulerType) {
        case _main:
            return upstream.observeOn(AndroidSchedulers.mainThread());
        case _io:
            return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
        case _io_main:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        case _io_io:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(RxSchedulerUtils.io(mIOExecutor));
        default:
            break;
    }
    return upstream;
}
 
源代码2 项目: RxBus2   文件: RxBus.java
/**
 * 用于处理订阅事件在那个线程中执行
 *
 * @param observable       d
 * @param subscriberMethod d
 * @return Observable
 */
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
    Scheduler scheduler;
    switch (subscriberMethod.threadMode) {
        case MAIN:
            scheduler = AndroidSchedulers.mainThread();
            break;

        case NEW_THREAD:
            scheduler = Schedulers.newThread();
            break;

        case CURRENT_THREAD:
            scheduler = Schedulers.trampoline();
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
    }
    return observable.observeOn(scheduler);
}
 
源代码3 项目: YiZhi   文件: RxBus.java
/**
 * 用于处理订阅事件在那个线程中执行
 *
 * @param observable       d
 * @param subscriberMethod d
 * @return Observable
 */
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
    Scheduler scheduler;
    switch (subscriberMethod.threadMode) {
        case MAIN:
            scheduler = AndroidSchedulers.mainThread();
            break;

        case NEW_THREAD:
            scheduler = Schedulers.newThread();
            break;

        case CURRENT_THREAD:
            scheduler = Schedulers.trampoline();
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
    }
    return observable.observeOn(scheduler);
}
 
源代码4 项目: rxjava-RxLife   文件: FlowableLife.java
private void subscribeActual(Subscriber<? super T> s) {
    Flowable<T> upStream = this.upStream;
    if (onMain) {
        upStream = upStream.observeOn(AndroidSchedulers.mainThread());
    }
    upStream.onTerminateDetach().subscribe(new LifeSubscriber<>(s, scope));
}
 
源代码5 项目: RxBus   文件: RxBus.java
private <T> void subscribe(final Object subscriber,
                           final String tag,
                           final boolean isSticky,
                           final Scheduler scheduler,
                           final Callback<T> callback) {
    Utils.requireNonNull(subscriber, tag, callback);

    final Class<T> typeClass = Utils.getTypeClassFromParadigm(callback);

    final Consumer<T> onNext = new Consumer<T>() {
        @Override
        public void accept(T t) {
            callback.onEvent(t);
        }
    };

    if (isSticky) {
        final TagMessage stickyEvent = CacheUtils.getInstance().findStickyEvent(typeClass, tag);
        if (stickyEvent != null) {
            Flowable<T> stickyFlowable = Flowable.create(new FlowableOnSubscribe<T>() {
                @Override
                public void subscribe(FlowableEmitter<T> emitter) {
                    emitter.onNext(typeClass.cast(stickyEvent.mEvent));
                }
            }, BackpressureStrategy.LATEST);
            if (scheduler != null) {
                stickyFlowable = stickyFlowable.observeOn(scheduler);
            }
            Disposable stickyDisposable = FlowableUtils.subscribe(stickyFlowable, onNext, mOnError);
            CacheUtils.getInstance().addDisposable(subscriber, stickyDisposable);
        } else {
            Utils.logW("sticky event is empty.");
        }
    }
    Disposable disposable = FlowableUtils.subscribe(
            toFlowable(typeClass, tag, scheduler), onNext, mOnError
    );
    CacheUtils.getInstance().addDisposable(subscriber, disposable);
}
 
源代码6 项目: reactive-streams-in-java   文件: RxJavaDemo.java
public static void runComputation() {
    Flowable<String> source = Flowable.fromCallable(() -> { //1
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    });
    source.doOnComplete(() -> System.out.println("Completed runComputation"));

    Flowable<String> background = source.subscribeOn(Schedulers.io()); //2

    Flowable<String> foreground = background.observeOn(Schedulers.single());//3

    foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
 
源代码7 项目: pandroid   文件: MainObserverTransformer.java
@Override
public Publisher<T> apply(Flowable<T> upstream) {
    Flowable<T> tObservable = upstream
            .observeOn(AndroidSchedulers.mainThread());
    if (provider == null) {
        return tObservable;
    }
    return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
 
源代码8 项目: RxJava2-Android-Samples   文件: RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码9 项目: RxJava2-Android-Samples   文件: RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableAsysnc() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码10 项目: RxJava2-Android-Samples   文件: RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableCompute() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码11 项目: Collection-Android   文件: RxSchedulerUtils.java
/**
 * 回到主线程
 *
 * @param flowable 被观察者
 */
public static <T> Flowable<T> toMain(Flowable<T> flowable) {
    return flowable.observeOn(AndroidSchedulers.mainThread());
}
 
源代码12 项目: Collection-Android   文件: RxSchedulerUtils.java
/**
 * 回到io线程
 *
 * @param flowable 被观察者
 */
public static <T> Flowable<T> toIo(Flowable<T> flowable) {
    return flowable.observeOn(Schedulers.io());
}