下面列出了io.reactivex.Flowable#subscribeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void runComputation() {
Flowable<String> source = Flowable.fromCallable(() -> { //1
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
source.doOnComplete(() -> System.out.println("Completed runComputation"));
Flowable<String> background = source.subscribeOn(Schedulers.io()); //2
Flowable<String> foreground = background.observeOn(Schedulers.single());//3
foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
private void testShifted(int n, boolean async) {
Flowable<Integer> a = Flowable.just(0).concatWith(Flowable.range(1, n));
if (async) {
a = a.subscribeOn(Schedulers.computation());
}
Flowable<Integer> b = Flowable.range(1, n);
assertTrue(Flowable.sequenceEqual(matchThem(a, b), Flowable.range(1, n)).blockingGet());
}
private <T> Flowable<T> create(Supplier<T> supplier) {
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
}
}, BackpressureStrategy.DROP);
if (sc == null) {
return flowable.subscribeOn(Schedulers.io());
}
return flowable.subscribeOn(sc).observeOn(sc);
}
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
@NonNull StorIOSQLite storIOSQLite,
@NonNull Flowable<T> flowable
) {
final Scheduler scheduler = storIOSQLite.defaultRxScheduler();
return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
@NonNull StorIOContentResolver storIOContentResolver,
@NonNull Flowable<T> flowable
) {
final Scheduler scheduler = storIOContentResolver.defaultRxScheduler();
return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
public static <T> IO<T> of(Supplier<? extends T> s, Scheduler ex){
Flowable<T> x = Flowable.fromCallable(() -> s.get());
x = x.subscribeOn(ex);
return new FlowableIO<T>(x);
}