下面列出了怎么用io.reactivex.rxjava3.core.CompletableObserver的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void subscribeActual(CompletableObserver downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(
() -> upstream.subscribe(new RateLimiterCompletableObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterCompletableObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}
@Override
protected void subscribeActual(CompletableObserver downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerCompletableObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(CompletableObserver downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadCompletableObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Override
protected void subscribeActual(CompletableObserver observer) {
executor.execute(new CompletableDataItemTask(dataQuery, observer));
}
public CompletableDataItemTask(SingleDataQuery dataQuery, CompletableObserver observer) {
this.dataQuery = dataQuery;
this.observer = observer;
}
CircuitBreakerCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
this.start = System.nanoTime();
}
BulkheadCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
}
public AbstractCompletableObserver(CompletableObserver downstreamObserver) {
this.downstreamObserver = requireNonNull(downstreamObserver);
}
RateLimiterCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
}
@Override
protected void subscribeActual(CompletableObserver s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextCompletableObserver(s, assemblyContext));
}
}
RequestContextCompletableObserver(CompletableObserver actual, RequestContext assemblyContext) {
this.actual = actual;
this.assemblyContext = assemblyContext;
}