下面列出了io.reactivex.CompletableOnSubscribe#io.reactivex.CompletableObserver 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public CompletableObserver apply(CompletableObserver observer) throws Exception {
return new DisposableCompletableObserver() {
@Override
public void onComplete() {
if (!isDisposed()) {
observer.onError(throwable);
}
}
@Override
public void onError(Throwable ex) {
if (!isDisposed()) {
observer.onError(ex);
}
}
};
}
@Test
public void testIgnoreElementsObservable() {
Observable.range(1, 10)
.ignoreElements()
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribed");
}
@Override
public void onComplete() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public CompletableObserver apply(CompletableObserver observer) throws Exception {
return new DisposableCompletableObserver() {
@Override
public void onComplete() {
if (!isDisposed()) {
observer.onError(throwable);
}
}
@Override
public void onError(Throwable ex) {
if (!isDisposed()) {
observer.onError(ex);
}
}
};
}
@Override public void onDeleteCheckPoint(int adapterPosition) {
getCheckPointDataSource().deleteCheckPoint(allCheckPoints.get(adapterPosition))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new CompletableObserver() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onComplete() {
allCheckPoints.remove(adapterPosition);
getView().removeMarker(adapterPosition);
getView().notifyListAdapter();
}
@Override public void onError(Throwable e) {
getView().showError("Delete Failed");
}
});
}
/**
* Performs the opposite translation from {@link
* MaybeConsumers#fromCompletableObserver(CompletableObserver)}
*/
public static CompletableObserver toCompletableObserver(final MaybeConsumer<Success> c) {
return new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// do nothing
}
@Override
public void onComplete() {
c.success(Success.SUCCESS);
}
@Override
public void onError(@NonNull Throwable e) {
c.fail(throwableToException(e));
}
};
}
private CompletableObserver getCompletableObserver() {
return new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
/**
* Callable that triggers the method {@link #refreshMetadataIds()}.
* @return Callable object to refresh metadata ids.
*/
public Callable<Void> refreshMetadataIdsCallable() {
return () -> {
refreshMetadataIds().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Started SMS metadata sync.");
}
@Override
public void onComplete() {
Log.d(TAG, "Completed SMS metadata sync.");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getClass().getSimpleName() + " Error on SMS metadata sync.");
}
});
return null;
};
}
@Override
protected void subscribeActual(CompletableObserver downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(
() -> upstream.subscribe(new RateLimiterCompletableObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterCompletableObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}
/**
* Adapts an Vert.x {@code Handler<AsyncResult<T>>} to an RxJava2 {@link SingleObserver}.
* <p>
* The returned observer can be subscribed to an {@link Single#subscribe(SingleObserver)}.
*
* @param handler the handler to adapt
* @return the observer
*/
public static <T> CompletableObserver toObserver(Handler<AsyncResult<T>> handler) {
AtomicBoolean completed = new AtomicBoolean();
return new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onComplete() {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.succeededFuture());
}
}
public void onSuccess() {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.succeededFuture());
}
}
@Override
public void onError(Throwable error) {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.failedFuture(error));
}
}
};
}
private void subscribeActual(CompletableObserver observer) {
Completable upStream = this.upStream;
if (onMain) {
upStream = upStream.observeOn(AndroidSchedulers.mainThread());
}
upStream.onTerminateDetach().subscribe(new LifeCompletableObserver(observer, scope));
}
@Test
public void testCompletableObservable() {
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) {
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
System.out.println("onComplete is called");
}
@Override
public void onError(Throwable e) {
System.out.println("onError is called" + e.getMessage());
}
});
}
@Override
public Completable logIn(@NonNull FragmentActivity activity, AuthenticationPayload payload) {
return new Completable() {
@Override
protected void subscribeActual(CompletableObserver observer) {
try {
mSyncAuthClient.signIn(activity, payload);
observer.onComplete();
} catch (InterruptedException e) {
processLogInResult(Result.cancel());
observer.onError(e);
}
}
};
}
@Override
public Completable signOutOfOkta(@NonNull FragmentActivity activity) {
return new Completable() {
@Override
protected void subscribeActual(CompletableObserver observer) {
try {
mSyncAuthClient.signOutOfOkta(activity);
observer.onComplete();
} catch (InterruptedException e) {
processSignOutResult(Result.cancel());
observer.onError(e);
}
}
};
}
public static void main(String[] args) {
// TODO Auto-generated method stub
String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya" };
Observable.fromArray(fruits).ignoreElements().subscribe(
new CompletableObserver() {
long time=0;
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
time=System.currentTimeMillis();
System.out.println(disposable.isDisposed()+"\t"+time);
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("completed");
long time_to_complete=System.currentTimeMillis()-time;
System.out.println("process completedin: "+time_to_complete+"ms");
}
});
}
/**
* MaybeConsumer<Success> is very similar to {@link CompletableObserver} in JavaRX. Both are
* looking for either a signal that a process has succeeded, or that it has failed with some
* exception. For the time period where we are still using both interfaces, we will find it useful
* to be able to switch between them.
*
* @return a {@link MaybeConsumer<Success>} that pipes {@link MaybeConsumer#success(Object)} to
* {@link CompletableObserver#onComplete()}, and {@link MaybeConsumer#fail(Exception)} to
* {@link CompletableObserver#onError(Throwable)}
*/
public static MaybeConsumer<Success> fromCompletableObserver(final CompletableObserver o) {
return new MaybeConsumer<Success>() {
@Override
public void success(Success value) {
o.onComplete();
}
@Override
public void fail(Exception e) {
o.onError(e);
}
};
}
public static CompletableObserver observe(String tag, String operation) {
return new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onComplete() {}
@Override
public void onError(Throwable e) {
complain(e, tag, operation);
}
};
}
@Override
protected void subscribeActual(CompletableObserver downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerCompletableObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(CompletableObserver downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadCompletableObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Test
public void testToCompletableObserverSuccess() {
Promise<String> promise = Promise.promise();
CompletableObserver observer = CompletableHelper.toObserver(promise);
Completable s = Completable.complete();
s.subscribe(observer);
assertTrue(promise.future().succeeded());
assertNull(promise.future().result());
}
@Test
public void testToCompletableObserverFailure() {
Promise<String> promise = Promise.promise();
CompletableObserver observer = CompletableHelper.toObserver(promise);
RuntimeException cause = new RuntimeException();
Completable s = Completable.error(cause);
s.subscribe(observer);
assertTrue(promise.future().failed());
assertSame(cause, promise.future().cause());
}
LifeCompletableObserver(CompletableObserver downstream,Scope scope) {
super(scope);
this.downstream = downstream;
}
ForwardingCompletableObserver(CompletableObserver delegate) {
this.delegate = delegate;
}
CircuitBreakerCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
this.start = System.nanoTime();
}
BulkheadCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
}
public AbstractCompletableObserver(CompletableObserver downstreamObserver) {
this.downstreamObserver = requireNonNull(downstreamObserver);
}
RateLimiterCompletableObserver(CompletableObserver downstreamObserver) {
super(downstreamObserver);
}
@Override
protected void subscribeActual(CompletableObserver s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextCompletableObserver(s, assemblyContext));
}
}
@Override
protected void subscribeActual(CompletableObserver s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextCompletableObserver(s, assemblyContext));
}
}
@Override
protected void subscribeActual(CompletableObserver s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextCompletableObserver(s, assemblyContext));
}
}
RequestContextCompletableObserver(CompletableObserver actual, RequestContext assemblyContext) {
this.actual = actual;
this.assemblyContext = assemblyContext;
}