下面列出了怎么用io.reactivex.ObservableSource的API类实例代码及写法,或者点击链接到github查看源代码。
public static Observable<Boolean> isPermited(User user, String[] permitsOrRoles, LogicType logicType) {
return Observable.fromArray(permitsOrRoles)
.flatMap(new Function<String, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(String role) throws Exception {
return user.rxIsAuthorized(role)
.toObservable();
}
})
.toList()
.map(new Function<List<Boolean>, Boolean>() {
@Override
public Boolean apply(List<Boolean> results) throws Exception {
boolean finalResult = results.get(0);
for (int i = 1; i < results.size(); i++) {
if (logicType == LogicType.AND) {
finalResult = finalResult && results.get(i);
}
if (logicType == LogicType.OR) {
finalResult = finalResult || results.get(i);
}
}
return finalResult;
}
}).toObservable();
}
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
.getUsers(lastIdQueried, USERS_PER_PAGE);
//使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
return mRepositoryManager.obtainCacheService(CommonCache.class)
.getUsers(users
, new DynamicKey(lastIdQueried)
, new EvictDynamicKey(update))
.flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
@Override
public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
return Observable.just(listReply.getData());
}
});
}
/**
* 订阅请求
*/
public static <T> void toSubscribe(Observable<T> observable, BaseObserver<T> observer) {
// 指定subscribe()发生在IO线程
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) {
return throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
boolean exceptionType = (throwable instanceof NetworkErrorException
|| throwable instanceof ConnectException
|| throwable instanceof SocketTimeoutException
|| throwable instanceof TimeoutException) && mRetryCount < 3;
if (exceptionType) {
mRetryCount++;
return Observable.timer(4000, TimeUnit.MILLISECONDS);
}
return Observable.error(throwable);
});
}
})
.subscribe(observer);
}
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Logger.t(TAG).d("Error:::" + throwable);
if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
} else if (throwable instanceof CompositeException) {
CompositeException compositeException = (CompositeException) throwable;
//结合rxcache会把异常进行包裹才会返回,需要解析提取
for (Throwable innerthrowable : compositeException.getExceptions()) {
if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
}
}
}
return Observable.error(throwable);
}
});
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
@Override
public Observable<VideoListInfo> getVideoList(String type, String lastIdQueried,int startCount,boolean update) {
Observable<VideoListInfo> videoInfo = mRepositoryManager.obtainRetrofitService(VideoService.class)
.getVideoList(startCount, Constants.HOME_VIDEO_LIST_PAGE_SIZE,type,Constants.UDID);
//使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
return mRepositoryManager.obtainCacheService(CommonCache.class)
.getVideoList(videoInfo
, new DynamicKey(lastIdQueried)
, new EvictDynamicKey(update))
.flatMap(new Function<Reply<VideoListInfo>, ObservableSource<VideoListInfo>>() {
@Override
public ObservableSource<VideoListInfo> apply(@NonNull Reply<VideoListInfo> listReply) throws Exception {
return Observable.just(listReply.getData());
}
});
}
public void work() {
mCompositeDisposable.add(Observable.interval(mIntervalMillis, TimeUnit.MILLISECONDS)
.subscribeOn(ThreadUtil.computationScheduler())
.observeOn(ThreadUtil.computationScheduler())
.concatMap(new Function<Long, ObservableSource<TrafficInfo>>() {
@Override
public ObservableSource<TrafficInfo> apply(Long aLong) throws Exception {
ThreadUtil.ensureWorkThread("TrafficEngine apply");
return create();
}
}).subscribe(new Consumer<TrafficInfo>() {
@Override
public void accept(TrafficInfo food) throws Exception {
ThreadUtil.ensureWorkThread("TrafficEngine accept");
mProducer.produce(food);
}
}));
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
switch (mSchedulerType) {
case _main:
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
case _io_main:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(AndroidSchedulers.mainThread());
case _io_io:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(RxSchedulerUtils.io(mIOExecutor));
default:
break;
}
return upstream;
}
/**
* 线程调度器
*/
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnNext(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseGroupsOp, List<Card>> getGroupTransformer() {
return new ObservableTransformer<ParseGroupsOp, List<Card>>() {
@Override
public ObservableSource<List<Card>> apply(Observable<ParseGroupsOp> upstream) {
return upstream.map(new Function<ParseGroupsOp, List<Card>>() {
@Override
public List<Card> apply(ParseGroupsOp parseGroupsOp) throws Exception {
return parseGroup(parseGroupsOp.getArg1(), parseGroupsOp.getArg2());
}
});
}
};
}
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public Maybe<Trip> getTrip(final String tripId) {
return getTrips()
.toObservable()
.flatMap(new Function<List<Trip>, ObservableSource<? extends Trip>>() {
@Override
public ObservableSource<? extends Trip> apply(List<Trip> tripList) throws Exception {
return Observable.fromIterable(tripList);
}
})
.filter(new Predicate<Trip>() {
@Override
public boolean test(Trip trip) throws Exception {
return trip.getId().equals(tripId);
}
})
.singleElement();
}
@Override
public Observable<Boolean> seedDatabaseOptions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mOptionRepository.isOptionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = new TypeToken<List<Option>>() {
}.getType();
List<Option> optionList = gson.fromJson(
FileUtils.loadJSONFromAsset(
mContext,
AppConstants.SEED_DATABASE_OPTIONS),
type);
return mOptionRepository.saveOptionList(optionList);
}
return Observable.just(false);
}
});
}
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
return Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)
.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() - pair.first.getTime();
long diffSeconds = diff / 1000;
timerAction.accept(diffSeconds);
})
.map(pair -> pair.second);
}
/**
* 对结果进行预处理
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
if(tBaseResponse.getErrorCode() == 0){
return createObservable(tBaseResponse.getData());//创建我们需要的数据
}
return Observable.error(
new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
);
}
});
}
};
}
public static void postTopic(IResponsibleView iResponsibleView,
String title,
String content,
String node,
ResponseListener<Topic> listener){
int syntax = 1;
topicApi.getPostTopicPage(node)
.compose(RxUtil.io2computation())
.flatMap((Function<String, ObservableSource<String>>) s -> {
int once = HtmlUtil.getOnceFromPostTopicPage(s);
if (once < 1){
ErrorEnum.ERROR_PARSE_HTML.throwThis();
}
return topicApi.createTopic(node, title, content, once, syntax);
})
.map(s -> {
ErrorEnum.ERROR_CREATE_TOPIC_TOO_OFTEN.check(s);
ErrorEnum.ERROR_CREATE_TOPIC_NEED_VERIFY_EMAIL.check(s);
return HtmlUtil.getTopicAndReplies(s);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new RxObserver2<>(iResponsibleView, listener));
}
/**
* 预登录获取验证码
*
*/
public void preLogin(){
mUserApi.getLoginPage()
.compose(RxUtil.io2computation())
.flatMap((Function<String, ObservableSource<Bitmap>>) s -> {
ErrorEnum.ERROR_AUTH_IP_BE_BANED.check(s);
fieldNames = HtmlUtil.getLoginFieldName(s);
return mUserApi.getCaptcha(fieldNames[3]);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new RxObserver<Bitmap>() {
@Override
public void _onNext(Bitmap bitmap) {
callBack.onNextResult(bitmap);
}
@Override
public void _onError(String msg) {
super._onError(msg);
returnFailed(msg);
}
});
}
/**
* 从设置页面获取用户信息并返回结果
*
* @param responseListener 请求结果回调接口
*/
public static void getInfo(ResponseListener<Account> responseListener){
mUserApi.getSettingPage()
.compose(RxUtil.io2computation())
.flatMap((Function<String, ObservableSource<Account>>) s -> {
ErrorEnum.ERR_PAGE_NEED_LOGIN.check(s);
ErrorEnum.ERR_PAGE_NEED_LOGIN0.check(s);
return getAccount(s);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new RxObserver<Account>() {
@Override
public void _onError(String msg) {
super._onError(msg);
responseListener.onFailed(msg);
}
@Override
public void _onNext(Account account) {
responseListener.onComplete(account);
}
});
}
@Test
public void test1(){
io.reactivex.Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onComplete();
}
})
.flatMap(new Function<String, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(String s) throws Exception {
System.out.println(s);
return io.reactivex.Observable.just(1,2,3);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
public Observable<RxBeaconRange> beaconsInRegion() {
return startup()
.flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
@Override
public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
beaconManager.addRangeNotifier(new RangeNotifier() {
@Override
public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
}
});
beaconManager.startRangingBeaconsInRegion(getRegion());
}
});
}
});
}
private void takePhoto(final Uri outputUri) {
Observable<Boolean> permissionObservable = Observable.just(true);
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
permissionObservable = new RxPermissions(this).request(Manifest.permission.WRITE_EXTERNAL_STORAGE);
}
disposable = permissionObservable.flatMap(new Function<Boolean, ObservableSource<Uri>>() {
@Override
public ObservableSource<Uri> apply(@NonNull Boolean granted) throws Exception {
if (!granted) {
return Observable.empty();
}
return RxGallery.photoCapture(MainActivity.this, outputUri).toObservable();
}
}).subscribe(new Consumer<Uri>() {
@Override
public void accept(Uri uri) throws Exception {
Toast.makeText(MainActivity.this, uri.toString(), Toast.LENGTH_LONG).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, throwable.getMessage(), Toast.LENGTH_LONG).show();
}
});
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseComponentsOp, List<BaseCell>> getComponentTransformer() {
return new ObservableTransformer<ParseComponentsOp, List<BaseCell>>() {
@Override
public ObservableSource<List<BaseCell>> apply(Observable<ParseComponentsOp> upstream) {
return upstream.map(new Function<ParseComponentsOp, List<BaseCell>>() {
@Override
public List<BaseCell> apply(ParseComponentsOp parseComponentsOp) throws Exception {
return parseComponent(parseComponentsOp.getArg1(), parseComponentsOp.getArg2(), parseComponentsOp.getArg3());
}
});
}
};
}
private Observable<String> getDetailDataForTwo(String url) {
return Observable.just(url).subscribeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
StringBuilder stringBuilder = new StringBuilder("http://all.baiyug.cn:2021/QQQ/index.php").append("?url=")
.append(s);
Document document = Jsoup.connect(stringBuilder.toString()).header("Referer", "http://app.baiyug.cn").get();
String content = document.outerHtml();
String start = "url:";
int startIndex = content.indexOf(start);
String end = "pic: pic";
String url1 = content.substring(startIndex + start.length(), content.lastIndexOf(end)).trim();
String realUrl = url1.substring(1, url1.length() - 2);
if (URLUtil.isValidUrl(realUrl)) {
return Observable.just(realUrl);
} else {
return getDetailDataForThree(url);
}
}
});
}
public Observable<String> getDetailDataForFour(String url) {
return Observable.just(url).subscribeOn(Schedulers.io())
.flatMap((Function<String, ObservableSource<String>>) s -> {
StringBuilder stringBuilder = new StringBuilder("http://all.baiyug.cn:2021/QQ/index.php").append("?url=")
.append(s);
Document document = Jsoup.connect(stringBuilder.toString()).header("Referer", "http://app.baiyug.cn").get();
String content = document.outerHtml();
String start = "url:";
int startIndex = content.indexOf(start);
String end = "pic: pic";
String url1 = content.substring(startIndex + start.length(), content.lastIndexOf(end)).trim();
String realUrl = url1.substring(1, url1.length() - 2);
if (URLUtil.isValidUrl(realUrl)) {
return Observable.just(realUrl);
} else {
return getDetailDataForThree(url);
}
});
}
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
return Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)
.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() - pair.first.getTime();
long diffSeconds = diff / 1000;
timerAction.accept(diffSeconds);
})
.map(pair -> pair.second);
}
private void initSearchFeature(StudentAdapter adapter) {
RxSearchObservable.fromSearchView(svKey)
.debounce(500, TimeUnit.MILLISECONDS)
.filter(text -> !text.isEmpty())
.distinctUntilChanged()
.switchMap(new Function<String, ObservableSource<ArrayList<String>>>() {
@Override
public ObservableSource<ArrayList<String>> apply(@NonNull String key) throws Exception {
return DataSource.getSearchData(key);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(names -> {
adapter.removeAllNames();
adapter.addStudentNames(names);
});
}
/**
* 统一返回结果处理
* @param <T> 指定的泛型类型
* @return ObservableTransformer
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> baseResponse) throws Exception {
if (baseResponse.getErrorCode() == BaseResponse.SUCCESS
&& baseResponse.getData() != null
) {
return createData(baseResponse.getData());
} else {
return Observable.error(new Exception(baseResponse.getErrorMsg()));
}
}
});
}
};
}
/**
* 缓存transformer
*
* @param cacheMode 缓存类型
* @param type 缓存clazz
*/
@SuppressWarnings(value={"unchecked", "deprecation"})
public <T> ObservableTransformer<T, CacheResult<T>> transformer(CacheMode cacheMode, final Type type) {
final IStrategy strategy = loadStrategy(cacheMode);//获取缓存策略
return new ObservableTransformer<T, CacheResult<T>>() {
@Override
public ObservableSource<CacheResult<T>> apply(@NonNull Observable<T> upstream) {
HttpLog.i("cackeKey=" + RxCache.this.cacheKey);
Type tempType = type;
if (type instanceof ParameterizedType) {//自定义ApiResult
Class<T> cls = (Class) ((ParameterizedType) type).getRawType();
if (CacheResult.class.isAssignableFrom(cls)) {
tempType = Utils.getParameterizedType(type, 0);
}
}
return strategy.execute(RxCache.this, RxCache.this.cacheKey, RxCache.this.cacheTime, upstream, tempType);
}
};
}
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
return new ObservableTransformer<PayResult, PayResult>() {
@Override
public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
return upstream.map(new Function<PayResult, PayResult>() {
@Override
public PayResult apply(PayResult payResult) throws Exception {
if (!payResult.isSucceed()) {
throw new PayFailedException(payResult.getResultStatus(), payResult.getErrInfo());
}
return payResult;
}
});
}
};
}