下面列出了怎么用io.reactivex.exceptions.CompositeException的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Logger.t(TAG).d("Error:::" + throwable);
if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
} else if (throwable instanceof CompositeException) {
CompositeException compositeException = (CompositeException) throwable;
//结合rxcache会把异常进行包裹才会返回,需要解析提取
for (Throwable innerthrowable : compositeException.getExceptions()) {
if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
}
}
}
return Observable.error(throwable);
}
});
}
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Logger.t(TAG).d("Error:::" + throwable);
if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
} else if (throwable instanceof CompositeException) {
CompositeException compositeException = (CompositeException) throwable;
//结合rxcache会把异常进行包裹才会返回,需要解析提取
for (Throwable innerthrowable : compositeException.getExceptions()) {
if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
}
}
}
return Observable.error(throwable);
}
});
}
private void showErrors(CompositeException exceptions) {
ArrayList<ErrorCause> errors = new ArrayList<>(exceptions.size());
for (Throwable throwable : exceptions.getExceptions())
errors.add(ErrorCause.fromThrowable(throwable));
showErrors(errors);
}
@Override
public void onViewCreated(@NonNull View view, @Nullable Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
ButterKnife.bind(this, view);
setupViews(view);
progressBar.setMax(sources.size());
setProgress(0);
disposable = Observable.mergeDelayError(sources)
.observeOn(AndroidSchedulers.mainThread(), true)
.subscribeOn(Schedulers.newThread())
.doFinally(() -> {
done();
if (autoDismiss)
dismiss();
})
.subscribe(item -> {
listener.onProgress(item);
setProgress((int) (progressBar.getProgress() + 1));
}, err -> {
// Note: progress is useless here since errors are delayed
if (err instanceof CompositeException) showErrors(((CompositeException) err));
else showErrors(err);
});
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(SubscriptionHelper.CANCELLED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(SubscriptionHelper.CANCELLED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onError(Throwable t) {
if (get() != SubscriptionHelper.CANCELLED) {
// lazySet(SubscriptionHelper.CANCELLED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
@Test(expected = BlockingOperationError.class)
public void testBlockingCallInsideRxJavaFlowable() throws Throwable {
try {
Flowable.timer(10, TimeUnit.MILLISECONDS)
.doOnEach(it -> Thread.sleep(10))
.blockingFirst();
} catch (CompositeException e) {
throw e.getExceptions().get(0);
}
}
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = mOriginalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = getResponse(call);
if (mCachingActive && !mCachingSystem.contains(CacheUtils.urlToKey(mOriginalCall.request().url()))) {
mCachingSystem.put(
CacheUtils.urlToKey(call.request().url()), CacheUtils.responseToBytes(mRetrofit, response, mResponseType, mAnnotations));
}
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
@Override
public void onError(Throwable throwable) {
try {
mObserver.onNext(Result.<R>error(throwable));
} catch (Throwable t) {
try {
mObserver.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
return;
}
mObserver.onComplete();
}
@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
mObserver.onNext(response.body());
} else {
mTerminated = true;
Throwable t = new HttpException(response);
try {
mObserver.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
@Override
public void onResponse(Call<T> call, Response<T> response) {
if (call.isCanceled()) {
return;
}
if (mCachingActive) {
mCachingSystem.put(
CacheUtils.urlToKey(call.request().url()), CacheUtils.responseToBytes(mRetrofit, response, mResponseType, mAnnotations));
}
try {
mObserver.onNext(response);
if (!call.isCanceled()) {
mTerminated = true;
mObserver.onComplete();
}
} catch (Throwable t) {
if (mTerminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
mObserver.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
@Override
public void onFailure(Call<T> call, Throwable t) {
if (call.isCanceled()) {
return;
}
try {
mObserver.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
@Override
public void onError(Throwable throwable) {
try {
observer.onNext(Result.<R>error(throwable));
} catch (Throwable t) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
return;
}
observer.onComplete();
}
@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
@Override
public void onError(Response<T> response) {
if (call.isCanceled()) return;
Throwable throwable = response.getException();
try {
terminated = true;
observer.onError(throwable);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(throwable, inner));
}
}
@Test
public void givenSubscriberAndError_whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
public static ApiException handleException(Throwable e) {
//使用RxCache之后返回的是包裹的CompositeException,一般包含2个异常,rxcache异常和原本的异常
Logger.t(TAG).d("开始解析错误------");
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
for (Throwable throwable : compositeException.getExceptions()) {
if (!(throwable instanceof RxCacheException)) {
e = throwable;
Logger.t(TAG).d("其他异常:" + throwable.getMessage());
} else {
Logger.t(TAG).d("RxCache 异常");
}
}
}
ApiException ex;
if (e instanceof HttpException) {
HttpException httpException = (HttpException) e;
ex = new ApiException(httpException, httpException.code());
ex.message = httpException.getMessage();
//如果是403,尝试让用户手动验证
if (httpException.code() == 403) {
EventBus.getDefault().post(new NeedCheckGoogleRecaptchaEvent());
}
return ex;
} else if (e instanceof JsonParseException
|| e instanceof JSONException
|| e instanceof JsonSerializer
|| e instanceof NotSerializableException
|| e instanceof ParseException) {
ex = new ApiException(e, Error.PARSE_ERROR);
ex.message = "数据解析错误";
return ex;
} else if (e instanceof ClassCastException) {
ex = new ApiException(e, Error.CAST_ERROR);
ex.message = "类型转换错误";
return ex;
} else if (e instanceof ConnectException) {
ex = new ApiException(e, Error.NETWORD_ERROR);
ex.message = "连接失败";
return ex;
} else if (e instanceof javax.net.ssl.SSLHandshakeException) {
ex = new ApiException(e, Error.SSL_ERROR);
ex.message = "证书验证失败";
return ex;
} else if (e instanceof ConnectTimeoutException) {
ex = new ApiException(e, Error.TIMEOUT_ERROR);
ex.message = "网络连接超时";
return ex;
} else if (e instanceof java.net.SocketTimeoutException) {
ex = new ApiException(e, Error.TIMEOUT_ERROR);
ex.message = "网络连接超时";
return ex;
} else if (e instanceof UnknownHostException) {
ex = new ApiException(e, Error.UNKNOWNHOST_ERROR);
ex.message = "无法解析该域名";
return ex;
} else if (e instanceof NullPointerException) {
if (!BuildConfig.DEBUG) {
//Bugsnag.notify(new Throwable("NullPointerException:" + MyApplication.getInstance().getDataManager().getPorn9VideoAddress() + ":::" + MyApplication.getInstance().getDataManager().getPorn9ForumAddress(), e), Severity.WARNING);
}
ex = new ApiException(e, Error.NULLPOINTER_EXCEPTION);
ex.message = "NullPointerException";
return ex;
} else if (e instanceof VideoException) {
ex = new ApiException(e, Error.PARSE_VIDEO_URL_ERROR);
ex.message = e.getMessage();
return ex;
} else if (e instanceof FavoriteException) {
ex = new ApiException(e, Error.FAVORITE_VIDEO_ERROR);
ex.message = e.getMessage();
return ex;
} else if (e instanceof DaoException) {
ex = new ApiException(e, Error.GREEN_DAO_ERROR);
ex.message = "数据库错误";
return ex;
} else if (e instanceof MessageException) {
ex = new ApiException(e, Error.COMMON_MESSAGE_ERROR);
ex.message = e.getMessage();
return ex;
} else {
ex = new ApiException(e, Error.UNKNOWN);
ex.message = "未知错误:" + e.getMessage();
return ex;
}
}
public static ApiException handleException(Throwable e) {
//使用RxCache之后返回的是包裹的CompositeException,一般包含2个异常,rxcache异常和原本的异常
Logger.t(TAG).d("开始解析错误------");
if (e instanceof CompositeException) {
CompositeException compositeException = (CompositeException) e;
for (Throwable throwable : compositeException.getExceptions()) {
if (!(throwable instanceof RxCacheException)) {
e = throwable;
Logger.t(TAG).d("其他异常:" + throwable.getMessage());
} else {
Logger.t(TAG).d("RxCache 异常");
}
}
}
ApiException ex;
if (e instanceof HttpException) {
HttpException httpException = (HttpException) e;
ex = new ApiException(httpException, httpException.code());
ex.message = httpException.getMessage();
//如果是403,尝试让用户手动验证
if (httpException.code() == 403) {
EventBus.getDefault().post(new NeedCheckGoogleRecaptchaEvent());
}
return ex;
} else if (e instanceof JsonParseException
|| e instanceof JSONException
|| e instanceof JsonSerializer
|| e instanceof NotSerializableException
|| e instanceof ParseException) {
ex = new ApiException(e, Error.PARSE_ERROR);
ex.message = "数据解析错误";
return ex;
} else if (e instanceof ClassCastException) {
ex = new ApiException(e, Error.CAST_ERROR);
ex.message = "类型转换错误";
return ex;
} else if (e instanceof ConnectException) {
ex = new ApiException(e, Error.NETWORD_ERROR);
ex.message = "连接失败";
return ex;
} else if (e instanceof javax.net.ssl.SSLHandshakeException) {
ex = new ApiException(e, Error.SSL_ERROR);
ex.message = "证书验证失败";
return ex;
} else if (e instanceof ConnectTimeoutException) {
ex = new ApiException(e, Error.TIMEOUT_ERROR);
ex.message = "网络连接超时";
return ex;
} else if (e instanceof java.net.SocketTimeoutException) {
ex = new ApiException(e, Error.TIMEOUT_ERROR);
ex.message = "网络连接超时";
return ex;
} else if (e instanceof UnknownHostException) {
ex = new ApiException(e, Error.UNKNOWNHOST_ERROR);
ex.message = "无法解析该域名";
return ex;
} else if (e instanceof NullPointerException) {
if (!BuildConfig.DEBUG) {
//Bugsnag.notify(new Throwable("NullPointerException:" + MyApplication.getInstance().getDataManager().getPorn9VideoAddress() + ":::" + MyApplication.getInstance().getDataManager().getPorn9ForumAddress(), e), Severity.WARNING);
}
ex = new ApiException(e, Error.NULLPOINTER_EXCEPTION);
ex.message = "NullPointerException";
return ex;
} else if (e instanceof VideoException) {
ex = new ApiException(e, Error.PARSE_VIDEO_URL_ERROR);
ex.message = e.getMessage();
return ex;
} else if (e instanceof FavoriteException) {
ex = new ApiException(e, Error.FAVORITE_VIDEO_ERROR);
ex.message = e.getMessage();
return ex;
} else if (e instanceof DaoException) {
ex = new ApiException(e, Error.GREEN_DAO_ERROR);
ex.message = "数据库错误";
return ex;
} else if (e instanceof MessageException) {
ex = new ApiException(e, Error.COMMON_MESSAGE_ERROR);
ex.message = e.getMessage();
return ex;
} else {
ex = new ApiException(e, Error.UNKNOWN);
ex.message = "未知错误:" + e.getMessage();
return ex;
}
}