下面列出了io.reactivex.SingleObserver#onError ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public SingleObserver<? super T> apply(final SingleObserver<? super T> observer) {
return new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
if (subscribedOnce.getAndSet(true)) {
throw new NullPointerException("You cannot directly subscribe to a gRPC service multiple times " +
"concurrently. Use Flowable.share() instead.");
} else {
observer.onSubscribe(d);
}
}
@Override
public void onSuccess(T t) {
observer.onSuccess(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
};
}
@Override
protected void subscribeActual(SingleObserver<? super Member<T>> observer) {
// the action of checking out a member from the pool is implemented as a
// subscription to the singleton MemberSingle
MemberSingleObserver<T> m = new MemberSingleObserver<T>(observer, this);
observer.onSubscribe(m);
if (pool.isClosed()) {
observer.onError(new PoolClosedException());
return;
}
add(m);
if (m.isDisposed()) {
remove(m);
} else {
// atomically change requested
while (true) {
Observers<T> a = observers.get();
if (observers.compareAndSet(a, a.withRequested(a.requested + 1))) {
break;
}
}
}
log.debug("subscribed");
drain();
}
/**
* MaybeConsumer<T> is very similar to {@link SingleObserver<T>} in JavaRX. Both are looking for
* either a signal that a computation has succeeded and returned a value of type T, or that it has
* failed with some exception. For the time period where we are still using both interfaces, we
* will find it useful to be able to switch between them.
*
* @return a {@link MaybeConsumer<T>} that pipes {@link MaybeConsumer#success(Object)} to {@link
* SingleObserver#onSuccess(Object)}, and {@link MaybeConsumer#fail(Exception)} to {@link
* SingleObserver#onError(Throwable)}
*/
public static <T> MaybeConsumer<T> fromSingleObserver(final SingleObserver<T> o) {
return new MaybeConsumer<T>() {
@Override
public void success(T value) {
o.onSuccess(value);
}
@Override
public void fail(Exception e) {
o.onError(e);
}
};
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerSingleObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadSingleObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterSingleObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterSingleObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}