下面列出了io.reactivex.Observable#onErrorResumeNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
<T> Observable<CacheResult<T>> loadCache(final RxCache rxCache, Type type, final String key, final long time, final boolean needEmpty) {
Observable<CacheResult<T>> observable = rxCache.<T>load(type, key, time).flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
@Override
public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception {
if (t == null) {
return Observable.error(new NullPointerException("Not find the cache!"));
}
return Observable.just(new CacheResult<T>(true, t));
}
});
if (needEmpty) {
observable = observable
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
@Override
public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
return Observable.empty();
}
});
}
return observable;
}
public static <T> Observable<CacheResult<T>> loadRemoteSync(final RxCache rxCache, final String key, Observable<T> source, final CacheTarget target, final boolean needEmpty) {
Observable<CacheResult<T>> observable = source
.flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
@Override
public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception {
return saveCacheSync(rxCache, key, t, target);
}
});
if (needEmpty) {
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
@Override
public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
return Observable.empty();
}
});
}
return observable;
}
public static Observable<ApkVersion> getLatestVersion() {
boolean isInChina = isInChina();
CoolApkService coolApkService = ServiceGenerator.getInstance()
.createService(ApiConst.COOLAPK_BASE_URL, CoolApkService.class);
Observable<ApkVersion> dataFromCoolApk = coolApkService.getLatestRelease(BuildConfig.APPLICATION_ID)
.map(ApkVersionHelper::parseFromCoolApk);
GithubService githubService = ServiceGenerator.getInstance()
.createService(ApiConst.GITHUB_BASE_URL, GithubService.class);
Observable<ApkVersion> dataFromGithub = githubService.getLatestRelease(ApiConst.GITHUB_USERNAME, ApiConst.GITHUB_REPO_NAME)
.map(githubRelease -> {
String regex = "<br/>|<br>";
String[] arr = githubRelease.getBody().split(regex);
String versionInfo;
if (arr.length >= 2) {
versionInfo = isInChina ? arr[1].trim() : arr[0].trim();
} else {
versionInfo = githubRelease.getBody().replaceAll(regex, "");
}
return new ApkVersion(githubRelease.getName(), versionInfo);
});
if (isInChina) {
// In China region
// Firstly, request data from coolapk.
// If error throws, then request data from github.
return dataFromCoolApk.onErrorResumeNext(throwable -> dataFromGithub);
} else {
// In other regions
// Firstly, request data from GitHub.
// If error throws, then request data from coolapk.
return dataFromGithub.onErrorResumeNext(throwable -> dataFromCoolApk);
}
}
public <T> Observable<T> exceptionHandling(Observable<T> observable) {
IllegalArgumentException originalException = new IllegalArgumentException("Original call");
Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction = this
.onError(originalException);
return observable.onErrorResumeNext(resumeFunction);
}
/**
* Utility wrapper that returns an empty object when the rest response returns 404.
*
* @param call the rest call
* @param <T> the type of the response
* @return the Observable that returns empty on 404. If there are other rest errors, the
* exception is propagated.
*/
private <T> Observable<Optional<T>> emptyOnNotFound(Observable<Optional<T>> call) {
return call.onErrorResumeNext(exception -> {
if (exception instanceof RepositoryCallException
&& ((RepositoryCallException) exception).getStatusCode() == 404) {
return Observable.just(Optional.empty());
} else {
return Observable.error(exception);
}
});
}
public static <T> Observable<CacheResult<T>> loadCache(final RxCache rxCache, final String key, Type type, final boolean needEmpty) {
Observable<CacheResult<T>> observable = rxCache
.<T>load(key, type)
.subscribeOn(Schedulers.io());
if (needEmpty) {
observable = observable
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
@Override
public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
return Observable.empty();
}
});
}
return observable;
}
public <T> Observable<T> exceptionHandling(Observable<T> observable) {
Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction = this::onError;
return observable.onErrorResumeNext(resumeFunction);
}
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.onErrorResumeNext(new HttpResponseFunc<T>());
}