下面列出了io.reactivex.Flowable#observeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Publisher<T> apply(Flowable<T> upstream) {
switch (mSchedulerType) {
case _main:
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
case _io_main:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(AndroidSchedulers.mainThread());
case _io_io:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(RxSchedulerUtils.io(mIOExecutor));
default:
break;
}
return upstream;
}
/**
* 用于处理订阅事件在那个线程中执行
*
* @param observable d
* @param subscriberMethod d
* @return Observable
*/
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
Scheduler scheduler;
switch (subscriberMethod.threadMode) {
case MAIN:
scheduler = AndroidSchedulers.mainThread();
break;
case NEW_THREAD:
scheduler = Schedulers.newThread();
break;
case CURRENT_THREAD:
scheduler = Schedulers.trampoline();
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable.observeOn(scheduler);
}
/**
* 用于处理订阅事件在那个线程中执行
*
* @param observable d
* @param subscriberMethod d
* @return Observable
*/
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
Scheduler scheduler;
switch (subscriberMethod.threadMode) {
case MAIN:
scheduler = AndroidSchedulers.mainThread();
break;
case NEW_THREAD:
scheduler = Schedulers.newThread();
break;
case CURRENT_THREAD:
scheduler = Schedulers.trampoline();
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable.observeOn(scheduler);
}
private void subscribeActual(Subscriber<? super T> s) {
Flowable<T> upStream = this.upStream;
if (onMain) {
upStream = upStream.observeOn(AndroidSchedulers.mainThread());
}
upStream.onTerminateDetach().subscribe(new LifeSubscriber<>(s, scope));
}
private <T> void subscribe(final Object subscriber,
final String tag,
final boolean isSticky,
final Scheduler scheduler,
final Callback<T> callback) {
Utils.requireNonNull(subscriber, tag, callback);
final Class<T> typeClass = Utils.getTypeClassFromParadigm(callback);
final Consumer<T> onNext = new Consumer<T>() {
@Override
public void accept(T t) {
callback.onEvent(t);
}
};
if (isSticky) {
final TagMessage stickyEvent = CacheUtils.getInstance().findStickyEvent(typeClass, tag);
if (stickyEvent != null) {
Flowable<T> stickyFlowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) {
emitter.onNext(typeClass.cast(stickyEvent.mEvent));
}
}, BackpressureStrategy.LATEST);
if (scheduler != null) {
stickyFlowable = stickyFlowable.observeOn(scheduler);
}
Disposable stickyDisposable = FlowableUtils.subscribe(stickyFlowable, onNext, mOnError);
CacheUtils.getInstance().addDisposable(subscriber, stickyDisposable);
} else {
Utils.logW("sticky event is empty.");
}
}
Disposable disposable = FlowableUtils.subscribe(
toFlowable(typeClass, tag, scheduler), onNext, mOnError
);
CacheUtils.getInstance().addDisposable(subscriber, disposable);
}
public static void runComputation() {
Flowable<String> source = Flowable.fromCallable(() -> { //1
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
source.doOnComplete(() -> System.out.println("Completed runComputation"));
Flowable<String> background = source.subscribeOn(Schedulers.io()); //2
Flowable<String> foreground = background.observeOn(Schedulers.single());//3
foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
@Override
public Publisher<T> apply(Flowable<T> upstream) {
Flowable<T> tObservable = upstream
.observeOn(AndroidSchedulers.mainThread());
if (provider == null) {
return tObservable;
}
return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
public <T> FlowableTransformer<T, T> applyFlowableAsysnc() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
public <T> FlowableTransformer<T, T> applyFlowableCompute() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 回到主线程
*
* @param flowable 被观察者
*/
public static <T> Flowable<T> toMain(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
/**
* 回到io线程
*
* @param flowable 被观察者
*/
public static <T> Flowable<T> toIo(Flowable<T> flowable) {
return flowable.observeOn(Schedulers.io());
}