下面列出了io.reactivex.Observable#toFlowable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = mAsync
? new CallEnqueueObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit)
: new CallExecuteObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit);
Observable<?> observable;
if (mResult) {
observable = new ResultObservable<>(responseObservable);
} else if (mBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (mScheduler != null) {
observable = observable.subscribeOn(mScheduler);
}
if (mFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (mSingle) {
return observable.singleOrError();
}
if (mMaybe) {
return observable.singleElement();
}
if (mCompletable) {
return observable.ignoreElements();
}
return observable;
}
@GraphQLSubscription
public Publisher<Integer> tick() {
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
Thread.sleep(1000);
emitter.onNext(2);
Thread.sleep(1000);
emitter.onComplete();
});
return observable.toFlowable(BackpressureStrategy.BUFFER);
}
@SuppressWarnings("unchecked")
@Override
public <T> Publisher<T> toRSPublisher(Observable instance) {
return instance.toFlowable(BackpressureStrategy.MISSING);
}
@Test public void whenFlowableIsCreatedFromObservable_thenItIsProperlyInitialized() throws InterruptedException {
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable.toFlowable(BackpressureStrategy.BUFFER);
assertNotNull(integerFlowable);
}