下面列出了怎么用io.reactivex.rxjava3.core.SingleObserver的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerSingleObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadSingleObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterSingleObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterSingleObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}
CircuitBreakerSingleObserver(SingleObserver<? super T> downstreamObserver) {
super(downstreamObserver);
this.start = System.nanoTime();
}
BulkheadSingleObserver(SingleObserver<? super T> downstreamObserver) {
super(downstreamObserver);
}
public AbstractSingleObserver(SingleObserver<? super T> downstreamObserver) {
this.downstreamObserver = requireNonNull(downstreamObserver);
}
RateLimiterSingleObserver(SingleObserver<? super T> downstreamObserver) {
super(downstreamObserver);
}
RequestContextSingleObserver(SingleObserver<T> actual, RequestContext assemblyContext) {
this.actual = actual;
this.assemblyContext = assemblyContext;
}
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextSingleObserver<>(s, assemblyContext));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextSingleObserver<>(s, assemblyContext));
}
}
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextSingleObserver<>(s, assemblyContext));
}
}