类io.reactivex.ObservableSource源码实例Demo

下面列出了怎么用io.reactivex.ObservableSource的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: festival   文件: SecurityUtils.java
public static Observable<Boolean> isPermited(User user, String[] permitsOrRoles, LogicType logicType) {
    return Observable.fromArray(permitsOrRoles)
            .flatMap(new Function<String, ObservableSource<Boolean>>() {
                @Override
                public ObservableSource<Boolean> apply(String role) throws Exception {
                    return user.rxIsAuthorized(role)
                            .toObservable();
                }
            })
            .toList()
            .map(new Function<List<Boolean>, Boolean>() {
                @Override
                public Boolean apply(List<Boolean> results) throws Exception {
                    boolean finalResult = results.get(0);
                    for (int i = 1; i < results.size(); i++) {
                        if (logicType == LogicType.AND) {
                            finalResult = finalResult && results.get(i);
                        }
                        if (logicType == LogicType.OR) {
                            finalResult = finalResult || results.get(i);
                        }
                    }
                    return finalResult;
                }
            }).toObservable();
}
 
源代码2 项目: Aurora   文件: UserModel.java
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
    Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
            .getUsers(lastIdQueried, USERS_PER_PAGE);
    //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
    return mRepositoryManager.obtainCacheService(CommonCache.class)
            .getUsers(users
                    , new DynamicKey(lastIdQueried)
                    , new EvictDynamicKey(update))
            .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
                @Override
                public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
                    return Observable.just(listReply.getData());
                }
            });
}
 
源代码3 项目: AndroidWallet   文件: HttpMethods.java
/**
 * 订阅请求
 */
public static <T> void toSubscribe(Observable<T> observable, BaseObserver<T> observer) {
    // 指定subscribe()发生在IO线程
    observable.subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                private int mRetryCount;

                @Override
                public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) {
                    return throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
                        boolean exceptionType = (throwable instanceof NetworkErrorException
                                || throwable instanceof ConnectException
                                || throwable instanceof SocketTimeoutException
                                || throwable instanceof TimeoutException) && mRetryCount < 3;
                        if (exceptionType) {
                            mRetryCount++;
                            return Observable.timer(4000, TimeUnit.MILLISECONDS);
                        }
                        return Observable.error(throwable);
                    });
                }
            })
            .subscribe(observer);
}
 
源代码4 项目: v9porn   文件: RetryWhenProcess.java
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Throwable throwable) throws Exception {
            Logger.t(TAG).d("Error:::" + throwable);
            if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
                return Observable.timer(mInterval, TimeUnit.SECONDS);
            } else if (throwable instanceof CompositeException) {
                CompositeException compositeException = (CompositeException) throwable;
                //结合rxcache会把异常进行包裹才会返回,需要解析提取
                for (Throwable innerthrowable : compositeException.getExceptions()) {
                    if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                        Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
                        return Observable.timer(mInterval, TimeUnit.SECONDS);
                    }
                }
            }
            return Observable.error(throwable);
        }

    });
}
 
源代码5 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码6 项目: Aurora   文件: VideoModel.java
@Override
public Observable<VideoListInfo> getVideoList(String type, String lastIdQueried,int startCount,boolean update) {
    Observable<VideoListInfo> videoInfo = mRepositoryManager.obtainRetrofitService(VideoService.class)
            .getVideoList(startCount, Constants.HOME_VIDEO_LIST_PAGE_SIZE,type,Constants.UDID);
    //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
    return mRepositoryManager.obtainCacheService(CommonCache.class)
            .getVideoList(videoInfo
                    , new DynamicKey(lastIdQueried)
                    , new EvictDynamicKey(update))
            .flatMap(new Function<Reply<VideoListInfo>, ObservableSource<VideoListInfo>>() {
                @Override
                public ObservableSource<VideoListInfo> apply(@NonNull Reply<VideoListInfo> listReply) throws Exception {
                    return Observable.just(listReply.getData());
                }
            });
}
 
源代码7 项目: AndroidGodEye   文件: TrafficEngine.java
public void work() {
    mCompositeDisposable.add(Observable.interval(mIntervalMillis, TimeUnit.MILLISECONDS)
            .subscribeOn(ThreadUtil.computationScheduler())
            .observeOn(ThreadUtil.computationScheduler())
            .concatMap(new Function<Long, ObservableSource<TrafficInfo>>() {
                @Override
                public ObservableSource<TrafficInfo> apply(Long aLong) throws Exception {
                    ThreadUtil.ensureWorkThread("TrafficEngine apply");
                    return create();
                }
            }).subscribe(new Consumer<TrafficInfo>() {
                @Override
                public void accept(TrafficInfo food) throws Exception {
                    ThreadUtil.ensureWorkThread("TrafficEngine accept");
                    mProducer.produce(food);
                }
            }));
}
 
源代码8 项目: Collection-Android   文件: SchedulerTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    switch (mSchedulerType) {
        case _main:
            return upstream.observeOn(AndroidSchedulers.mainThread());
        case _io:
            return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
        case _io_main:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        case _io_io:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(RxSchedulerUtils.io(mIOExecutor));
        default:
            break;
    }
    return upstream;
}
 
源代码9 项目: TikTok   文件: RxUtils.java
/**
 * 线程调度器
 */
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<T>() {
                        @Override
                        public void accept(T t) throws Exception {

                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码10 项目: Tangram-Android   文件: PojoDataParser.java
/**
 * {@inheritDoc}
 */
@NonNull
@Override
public ObservableTransformer<ParseGroupsOp, List<Card>> getGroupTransformer() {
    return new ObservableTransformer<ParseGroupsOp, List<Card>>() {
        @Override
        public ObservableSource<List<Card>> apply(Observable<ParseGroupsOp> upstream) {
            return upstream.map(new Function<ParseGroupsOp, List<Card>>() {
                @Override
                public List<Card> apply(ParseGroupsOp parseGroupsOp) throws Exception {
                    return parseGroup(parseGroupsOp.getArg1(), parseGroupsOp.getArg2());
                }
            });
        }
    };
}
 
源代码11 项目: java-unified-sdk   文件: PushClient.java
private Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码12 项目: java-unified-sdk   文件: StorageClient.java
public Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码13 项目: InstantAppSample   文件: TripRepository.java
public Maybe<Trip> getTrip(final String tripId) {
    return getTrips()
            .toObservable()
            .flatMap(new Function<List<Trip>, ObservableSource<? extends Trip>>() {
                @Override
                public ObservableSource<? extends Trip> apply(List<Trip> tripList) throws Exception {
                    return Observable.fromIterable(tripList);
                }
            })
            .filter(new Predicate<Trip>() {
                @Override
                public boolean test(Trip trip) throws Exception {
                    return trip.getId().equals(tripId);
                }
            })
            .singleElement();
}
 
@Override
public Observable<Boolean> seedDatabaseOptions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mOptionRepository.isOptionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = new TypeToken<List<Option>>() {
                        }.getType();
                        List<Option> optionList = gson.fromJson(
                                FileUtils.loadJSONFromAsset(
                                        mContext,
                                        AppConstants.SEED_DATABASE_OPTIONS),
                                type);
                        return mOptionRepository.saveOptionList(optionList);
                    }
                    return Observable.just(false);
                }
            });
}
 
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
    return Observable.combineLatest(
            Observable.just(new Date()),
            upstream,
            Pair::create
    )
            .doOnNext((pair) -> {
                Date currentTime = new Date();
                long diff = currentTime.getTime() - pair.first.getTime();
                long diffSeconds = diff / 1000;

                timerAction.accept(diffSeconds);
            })
            .map(pair -> pair.second);
}
 
源代码16 项目: WanAndroid   文件: RxUtils.java
/**
 * 对结果进行预处理
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
                    if(tBaseResponse.getErrorCode() == 0){
                        return createObservable(tBaseResponse.getData());//创建我们需要的数据
                    }
                    return Observable.error(
                            new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
                    );
                }
            });
        }
    };
}
 
源代码17 项目: V2EX   文件: TopicService.java
public static void  postTopic(IResponsibleView iResponsibleView,
                      String title,
                      String content,
                      String node,
                      ResponseListener<Topic> listener){

    int syntax = 1;
    topicApi.getPostTopicPage(node)
            .compose(RxUtil.io2computation())
            .flatMap((Function<String, ObservableSource<String>>) s -> {
                int once = HtmlUtil.getOnceFromPostTopicPage(s);
                if (once < 1){
                   ErrorEnum.ERROR_PARSE_HTML.throwThis();
                }
                return topicApi.createTopic(node, title, content, once, syntax);
            })
            .map(s -> {
                ErrorEnum.ERROR_CREATE_TOPIC_TOO_OFTEN.check(s);
                ErrorEnum.ERROR_CREATE_TOPIC_NEED_VERIFY_EMAIL.check(s);
                return HtmlUtil.getTopicAndReplies(s);
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new RxObserver2<>(iResponsibleView, listener));
}
 
源代码18 项目: V2EX   文件: UserService.java
/**
 * 预登录获取验证码
 *
 */
public void preLogin(){

    mUserApi.getLoginPage()
            .compose(RxUtil.io2computation())
            .flatMap((Function<String, ObservableSource<Bitmap>>) s -> {
                ErrorEnum.ERROR_AUTH_IP_BE_BANED.check(s);
                fieldNames = HtmlUtil.getLoginFieldName(s);
                return mUserApi.getCaptcha(fieldNames[3]);
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new RxObserver<Bitmap>() {
                @Override
                public void _onNext(Bitmap bitmap) {
                    callBack.onNextResult(bitmap);
                }
                @Override
                public void _onError(String msg) {
                    super._onError(msg);
                    returnFailed(msg);
                }
            });
}
 
源代码19 项目: V2EX   文件: UserService.java
/**
 * 从设置页面获取用户信息并返回结果
 *
 * @param responseListener 请求结果回调接口
 */
public static void getInfo(ResponseListener<Account> responseListener){

    mUserApi.getSettingPage()
            .compose(RxUtil.io2computation())
            .flatMap((Function<String, ObservableSource<Account>>) s -> {
                ErrorEnum.ERR_PAGE_NEED_LOGIN.check(s);
                ErrorEnum.ERR_PAGE_NEED_LOGIN0.check(s);
                return getAccount(s);
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new RxObserver<Account>() {
                @Override
                public void _onError(String msg) {
                    super._onError(msg);
                    responseListener.onFailed(msg);
                }
                @Override
                public void _onNext(Account account) {
                    responseListener.onComplete(account);
                }
            });
}
 
源代码20 项目: V2EX   文件: LoginServiceTest.java
@Test
public void test1(){

    io.reactivex.Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("a");
                    emitter.onComplete();
                }
            })
            .flatMap(new Function<String, ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> apply(String s) throws Exception {
                    System.out.println(s);
                    return io.reactivex.Observable.just(1,2,3);
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });

}
 
源代码21 项目: RxBeacon   文件: RxBeacon.java
public Observable<RxBeaconRange> beaconsInRegion() {
    return startup()
            .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
                @Override
                public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
                        @Override
                        public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
                            beaconManager.addRangeNotifier(new RangeNotifier() {
                                @Override
                                public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
                                    objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
                                }
                            });
                            beaconManager.startRangingBeaconsInRegion(getRegion());
                        }
                    });
                }
            });
}
 
源代码22 项目: RxGallery   文件: MainActivity.java
private void takePhoto(final Uri outputUri) {
    Observable<Boolean> permissionObservable = Observable.just(true);
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
        permissionObservable = new RxPermissions(this).request(Manifest.permission.WRITE_EXTERNAL_STORAGE);
    }

    disposable =  permissionObservable.flatMap(new Function<Boolean, ObservableSource<Uri>>() {
        @Override
        public ObservableSource<Uri> apply(@NonNull Boolean granted) throws Exception {
            if (!granted) {
                return Observable.empty();
            }
            return RxGallery.photoCapture(MainActivity.this, outputUri).toObservable();
        }
    }).subscribe(new Consumer<Uri>() {
        @Override
        public void accept(Uri uri) throws Exception {
            Toast.makeText(MainActivity.this, uri.toString(), Toast.LENGTH_LONG).show();
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Toast.makeText(MainActivity.this, throwable.getMessage(), Toast.LENGTH_LONG).show();
        }
    });
}
 
源代码23 项目: Tangram-Android   文件: PojoDataParser.java
/**
 * {@inheritDoc}
 */
@NonNull
@Override
public ObservableTransformer<ParseComponentsOp, List<BaseCell>> getComponentTransformer() {
    return new ObservableTransformer<ParseComponentsOp, List<BaseCell>>() {
        @Override
        public ObservableSource<List<BaseCell>> apply(Observable<ParseComponentsOp> upstream) {
            return upstream.map(new Function<ParseComponentsOp, List<BaseCell>>() {
                @Override
                public List<BaseCell> apply(ParseComponentsOp parseComponentsOp) throws Exception {
                    return parseComponent(parseComponentsOp.getArg1(), parseComponentsOp.getArg2(), parseComponentsOp.getArg3());
                }
            });
        }
    };
}
 
源代码24 项目: NewFastFrame   文件: QQVideoDetailPresenter.java
private Observable<String> getDetailDataForTwo(String url) {
    return Observable.just(url).subscribeOn(Schedulers.io())
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    StringBuilder stringBuilder = new StringBuilder("http://all.baiyug.cn:2021/QQQ/index.php").append("?url=")
                            .append(s);
                    Document document = Jsoup.connect(stringBuilder.toString()).header("Referer", "http://app.baiyug.cn").get();
                    String content = document.outerHtml();
                    String start = "url:";
                    int startIndex = content.indexOf(start);
                    String end = "pic: pic";
                    String url1 = content.substring(startIndex + start.length(), content.lastIndexOf(end)).trim();
                    String realUrl = url1.substring(1, url1.length() - 2);
                    if (URLUtil.isValidUrl(realUrl)) {
                        return Observable.just(realUrl);
                    } else {
                        return getDetailDataForThree(url);
                    }
                }
            });
}
 
源代码25 项目: NewFastFrame   文件: QQVideoDetailPresenter.java
public Observable<String> getDetailDataForFour(String url) {
    return Observable.just(url).subscribeOn(Schedulers.io())
            .flatMap((Function<String, ObservableSource<String>>) s -> {
                StringBuilder stringBuilder = new StringBuilder("http://all.baiyug.cn:2021/QQ/index.php").append("?url=")
                        .append(s);
                Document document = Jsoup.connect(stringBuilder.toString()).header("Referer", "http://app.baiyug.cn").get();
                String content = document.outerHtml();
                String start = "url:";
                int startIndex = content.indexOf(start);
                String end = "pic: pic";
                String url1 = content.substring(startIndex + start.length(), content.lastIndexOf(end)).trim();
                String realUrl = url1.substring(1, url1.length() - 2);
                if (URLUtil.isValidUrl(realUrl)) {
                    return Observable.just(realUrl);
                } else {
                    return getDetailDataForThree(url);
                }
            });
}
 
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
    return Observable.combineLatest(
            Observable.just(new Date()),
            upstream,
            Pair::create
    )
            .doOnNext((pair) -> {
                Date currentTime = new Date();
                long diff = currentTime.getTime() - pair.first.getTime();
                long diffSeconds = diff / 1000;

                timerAction.accept(diffSeconds);
            })
            .map(pair -> pair.second);
}
 
源代码27 项目: RxAndroid-Examples   文件: MainActivity.java
private void initSearchFeature(StudentAdapter adapter) {
    RxSearchObservable.fromSearchView(svKey)
            .debounce(500, TimeUnit.MILLISECONDS)
            .filter(text -> !text.isEmpty())
            .distinctUntilChanged()
            .switchMap(new Function<String, ObservableSource<ArrayList<String>>>() {
                @Override
                public ObservableSource<ArrayList<String>> apply(@NonNull String key) throws Exception {
                    return DataSource.getSearchData(key);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(names -> {
                adapter.removeAllNames();
                adapter.addStudentNames(names);
            });
}
 
源代码28 项目: ViewPagerHelper   文件: RxUtils.java
/**
 * 统一返回结果处理
 * @param <T> 指定的泛型类型
 * @return ObservableTransformer
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {

    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> baseResponse) throws Exception {
                    if (baseResponse.getErrorCode() == BaseResponse.SUCCESS
                            && baseResponse.getData() != null
                    ) {
                        return createData(baseResponse.getData());
                    } else {
                        return Observable.error(new Exception(baseResponse.getErrorMsg()));
                    }
                }
            });

        }
    };
}
 
源代码29 项目: RxEasyHttp   文件: RxCache.java
/**
 * 缓存transformer
 *
 * @param cacheMode 缓存类型
 * @param type      缓存clazz
 */
@SuppressWarnings(value={"unchecked", "deprecation"})
public <T> ObservableTransformer<T, CacheResult<T>> transformer(CacheMode cacheMode, final Type type) {
    final IStrategy strategy = loadStrategy(cacheMode);//获取缓存策略
    return new ObservableTransformer<T, CacheResult<T>>() {
        @Override
        public ObservableSource<CacheResult<T>> apply(@NonNull Observable<T> upstream) {
            HttpLog.i("cackeKey=" + RxCache.this.cacheKey);
            Type tempType = type;
            if (type instanceof ParameterizedType) {//自定义ApiResult
                Class<T> cls = (Class) ((ParameterizedType) type).getRawType();
                if (CacheResult.class.isAssignableFrom(cls)) {
                    tempType = Utils.getParameterizedType(type, 0);
                }
            }
            return strategy.execute(RxCache.this, RxCache.this.cacheKey, RxCache.this.cacheTime, upstream, tempType);
        }
    };
}
 
源代码30 项目: RxPay   文件: RxPayUtils.java
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
    return new ObservableTransformer<PayResult, PayResult>() {
        @Override
        public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
            return upstream.map(new Function<PayResult, PayResult>() {
                @Override
                public PayResult apply(PayResult payResult) throws Exception {
                    if (!payResult.isSucceed()) {
                        throw new PayFailedException(payResult.getResultStatus(), payResult.getErrInfo());
                    }
                    return payResult;
                }
            });
        }
    };
}
 
 类所在包
 类方法
 同包方法