下面列出了怎么用io.reactivex.MaybeObserver的API类实例代码及写法,或者点击链接到github查看源代码。
private void subscribeActual(MaybeObserver<? super T> observer) {
Maybe<T> upStream = this.upStream;
if (onMain) {
upStream = upStream.observeOn(AndroidSchedulers.mainThread());
}
upStream.onTerminateDetach().subscribe(new LifeMaybeObserver<>(observer, scope));
}
@Test
public void testElementAtObservable() {
Observable.just(1, 2, 3, 4, 5, 6)
.elementAt(1)
.subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println("onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya", "pineapple", "apple", "apple" };
Observable.fromArray(fruits).elementAt(3).count()
.subscribe(item -> System.out.println("we got: " + item + " items from the Observable"));
Observable.fromArray(fruits).elementAt(10).subscribe(new MaybeObserver<String>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("successfully completed");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println(throwable.getMessage());
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
}
@Override
public void onSuccess(String value) {
// TODO Auto-generated method stub
System.out.println("value at specified position is:-"+value);
}
});
}
private MaybeObserver<Integer> getObserver() {
return new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerMaybeObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadMaybeObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterMaybeObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterMaybeObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}
/**
* Adapts an Vert.x {@code Handler<AsyncResult<T>>} to an RxJava2 {@link MaybeObserver}.
* <p>
* The returned observer can be subscribed to an {@link Maybe#subscribe(MaybeObserver)}.
*
* @param handler the handler to adapt
* @return the observer
*/
public static <T> MaybeObserver<T> toObserver(Handler<AsyncResult<T>> handler) {
AtomicBoolean completed = new AtomicBoolean();
return new MaybeObserver<T>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onComplete() {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.succeededFuture());
}
}
@Override
public void onSuccess(@NonNull T item) {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.succeededFuture(item));
}
}
@Override
public void onError(Throwable error) {
if (completed.compareAndSet(false, true)) {
handler.handle(io.vertx.core.Future.failedFuture(error));
}
}
};
}
@Test
public void testToMaybeObserverSuccess() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
Maybe<String> s = Maybe.just("foobar");
s.subscribe(observer);
assertTrue(promise.future().succeeded());
assertSame("foobar", promise.future().result());
}
@Test
public void testToMaybeObserverEmpty() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
Maybe<String> s = Maybe.empty();
s.subscribe(observer);
assertTrue(promise.future().succeeded());
assertNull(promise.future().result());
}
@Test
public void testToMaybeObserverFailure() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
RuntimeException cause = new RuntimeException();
Maybe<String> s = Maybe.error(cause);
s.subscribe(observer);
assertTrue(promise.future().failed());
assertSame(cause, promise.future().cause());
}
LifeMaybeObserver(MaybeObserver<? super T> downstream, Scope scope) {
super(scope);
this.downstream = downstream;
}
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
Observable<Integer> source1 = Observable.fromArray(numbers);
source1.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) throws Exception {
// TODO Auto-generated method stub
// 1, 2, 13, 34, 12, 10
int sum = 0;
return value1 + value2;
}
}).subscribe(new MaybeObserver<Integer>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("completed2");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.out.println(throwable.getMessage());
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onSuccess(Integer value) {
// TODO Auto-generated method stub
System.out.println(value);
}
});
}
ForwardingObserver(MaybeObserver<T> delegate) {
this.delegate = delegate;
}
public AbstractMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
this.downstreamObserver = requireNonNull(downstreamObserver);
}
CircuitBreakerMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
super(downstreamObserver);
this.start = System.nanoTime();
}
BulkheadMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
super(downstreamObserver);
}
RateLimiterMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
super(downstreamObserver);
}
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
}
}
RequestContextMaybeObserver(MaybeObserver<T> actual, RequestContext assemblyContext) {
this.actual = actual;
this.assemblyContext = assemblyContext;
}
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
}
}
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
try (SafeCloseable ignored = assemblyContext.push()) {
source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
}
}
TraceContextMaybeObserver(
MaybeObserver<T> downstream, CurrentTraceContext contextScoper, TraceContext assembled) {
this.downstream = downstream;
this.contextScoper = contextScoper;
this.assembled = assembled;
}
public static <T> MaybeObserver<T> wrap(MaybeObserver<T> downstream,
CurrentTraceContext contextScoper, TraceContext assembled) {
return new TraceContextMaybeObserver<>(downstream, contextScoper, assembled);
}
@Override protected void subscribeActual(MaybeObserver<? super Integer> o) {
delegate.subscribe(o);
}
@Override protected void subscribeActual(MaybeObserver<? super Integer> o) {
delegate.subscribe(o);
}
/**
* Wraps the observer so that its callbacks run in the assembly context. This does not affect any
* subscription callbacks.
*/
@Override protected void subscribeActual(MaybeObserver<? super T> o) {
source.subscribe(Wrappers.wrap(o, contextScoper, assembled));
}
/**
* Wraps the observer so that its callbacks run in the assembly context. This does not affect any
* subscription callbacks.
*/
@Override protected void subscribeActual(MaybeObserver<? super T> o) {
source.subscribe(new TraceContextMaybeObserver<>(o, contextScoper, assembled));
}