下面列出了io.reactivex.Observable#compose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 订阅列表
*/
public static Observable<SubscriptionListEntity> list(SubscriptionType type, long lastUpdatedTime) {
Observable<SubscriptionListEntity> observable = null;
if (type != null) {
switch (type) {
case collection:
observable = list(false, "collection", lastUpdatedTime);
break;
case notebook:
observable = list(false, "notebook", lastUpdatedTime);
break;
case user:
observable = list(false, "user", lastUpdatedTime);
break;
case push:
observable = list(true, null, lastUpdatedTime);
break;
}
}
if (observable == null) {
observable = list(false, null, lastUpdatedTime);
}
return observable.compose(process());
}
private static <T> ObservableSource<T> composeContext(Context context, Observable<T> observable) {
if(context instanceof RxActivity) {
return (ObservableSource<T>) observable.compose(((RxActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
} else if(context instanceof RxFragmentActivity){
return (ObservableSource<T>) observable.compose(((RxFragmentActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
}else if(context instanceof RxAppCompatActivity){
return (ObservableSource<T>) observable.compose(((RxAppCompatActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
}else {
return observable;
}
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
Observable<T> tObservable = upstream
.observeOn(AndroidSchedulers.mainThread());
if (provider == null) {
return tObservable;
}
return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
@Override
public ObservableSource<T> apply(Observable<Response<T>> upstream) {
if (mLifecycleTransformer != null) {
upstream = upstream.compose(mLifecycleTransformer);
}
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(response -> {
if (response.isSuccessful()) {
T t = response.body();
if (t != null) {
return Observable.just(t);
} else {
return Observable.error(new Throwable("response body = null"));
}
} else {
return Observable.error(new Throwable("network failed"));
}
})
.onErrorResumeNext(throwable -> {
onRequestFailure(throwable);
return Observable.empty();
})
.doOnNext(this::onRequestSuccess)
.doFinally(() -> System.out.print("all done"));
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
if (mLifecycleTransformer != null) {
upstream = upstream.compose(mLifecycleTransformer);
}
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public void post(String url, OnHttpCallBack<String> callBack) {
Observable<String> observable = mService.post(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
if (mLifecycleProvider != null) {
observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
}
observable.subscribe(new BaseSubscriber<String>() {
@Override
public void onError(@NonNull Throwable throwable) {
mSubscription = null;
callBack.onError(throwable.getMessage());
}
@Override
public void onComplete() {
mSubscription = null;
}
@Override
public void onNext(@NonNull String s) {
mSubscription = null;
callBack.onSuccess(s);
}
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
mSubscription = subscription;
}
});
}
public void get(String url, OnHttpCallBack<String> callBack) {
Observable<String> observable = mService.get(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
if (mLifecycleProvider != null) {
observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
}
observable.subscribe(new BaseSubscriber<String>() {
@Override
public void onError(@NonNull Throwable throwable) {
mSubscription = null;
callBack.onError(throwable.getMessage());
}
@Override
public void onComplete() {
mSubscription = null;
}
@Override
public void onNext(@NonNull String s) {
mSubscription = null;
callBack.onSuccess(s);
}
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
mSubscription = subscription;
}
});
}
@Override
public Observable<E> apply(Observable<F> effects) {
return effects.compose(mergedTransformer);
}