io.reactivex.Observable#compose ( )源码实例Demo

下面列出了io.reactivex.Observable#compose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: JianshuApp   文件: SubscriptionRepository.java
/**
 * 订阅列表
 */
public static Observable<SubscriptionListEntity> list(SubscriptionType type, long lastUpdatedTime) {
    Observable<SubscriptionListEntity> observable = null;
    if (type != null) {
        switch (type) {
            case collection:
                observable = list(false, "collection", lastUpdatedTime);
                break;
            case notebook:
                observable = list(false, "notebook", lastUpdatedTime);
                break;
            case user:
                observable = list(false, "user", lastUpdatedTime);
                break;
            case push:
                observable = list(true, null, lastUpdatedTime);
                break;
        }
    }
    if (observable == null) {
        observable = list(false, null, lastUpdatedTime);
    }
    return observable.compose(process());
}
 
源代码2 项目: MaoWanAndoidClient   文件: RxSchedulers.java
private static <T> ObservableSource<T> composeContext(Context context, Observable<T> observable) {
    if(context instanceof RxActivity) {
        return (ObservableSource<T>) observable.compose(((RxActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
    } else if(context instanceof RxFragmentActivity){
        return (ObservableSource<T>) observable.compose(((RxFragmentActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
    }else if(context instanceof RxAppCompatActivity){
        return (ObservableSource<T>) observable.compose(((RxAppCompatActivity) context).bindUntilEvent(ActivityEvent.DESTROY));
    }else {
        return observable;
    }
}
 
源代码3 项目: pandroid   文件: MainObserverTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    Observable<T> tObservable = upstream
            .observeOn(AndroidSchedulers.mainThread());
    if (provider == null) {
        return tObservable;
    }
    return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
 
源代码4 项目: My-MVP   文件: SimpleRequestResponseTransformer.java
@Override
public ObservableSource<T> apply(Observable<Response<T>> upstream) {
    if (mLifecycleTransformer != null) {
        upstream = upstream.compose(mLifecycleTransformer);
    }

    return upstream
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(response -> {
                if (response.isSuccessful()) {
                    T t = response.body();
                    if (t != null) {
                        return Observable.just(t);
                    } else {
                        return Observable.error(new Throwable("response body = null"));
                    }
                } else {
                    return Observable.error(new Throwable("network failed"));
                }
            })
            .onErrorResumeNext(throwable -> {
                onRequestFailure(throwable);
                return Observable.empty();
            })
            .doOnNext(this::onRequestSuccess)
            .doFinally(() -> System.out.print("all done"));
}
 
源代码5 项目: My-MVP   文件: SimpleRequestTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    if (mLifecycleTransformer != null) {
        upstream = upstream.compose(mLifecycleTransformer);
    }

    return upstream
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码6 项目: NGA-CLIENT-VER-OPEN-SOURCE   文件: BaseRxTask.java
public void post(String url, OnHttpCallBack<String> callBack) {
    Observable<String> observable = mService.post(url)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

    if (mLifecycleProvider != null) {
        observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
    }

    observable.subscribe(new BaseSubscriber<String>() {

        @Override
        public void onError(@NonNull Throwable throwable) {
            mSubscription = null;
            callBack.onError(throwable.getMessage());
        }

        @Override
        public void onComplete() {
            mSubscription = null;
        }

        @Override
        public void onNext(@NonNull String s) {
            mSubscription = null;
            callBack.onSuccess(s);
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            super.onSubscribe(subscription);
            mSubscription = subscription;
        }
    });
}
 
源代码7 项目: NGA-CLIENT-VER-OPEN-SOURCE   文件: BaseRxTask.java
public void get(String url, OnHttpCallBack<String> callBack) {
    Observable<String> observable = mService.get(url)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

    if (mLifecycleProvider != null) {
        observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
    }

    observable.subscribe(new BaseSubscriber<String>() {

        @Override
        public void onError(@NonNull Throwable throwable) {
            mSubscription = null;
            callBack.onError(throwable.getMessage());
        }

        @Override
        public void onComplete() {
            mSubscription = null;
        }

        @Override
        public void onNext(@NonNull String s) {
            mSubscription = null;
            callBack.onSuccess(s);
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            super.onSubscribe(subscription);
            mSubscription = subscription;
        }
    });
}
 
源代码8 项目: mobius   文件: MobiusEffectRouter.java
@Override
public Observable<E> apply(Observable<F> effects) {

  return effects.compose(mergedTransformer);
}