类io.reactivex.MaybeObserver源码实例Demo

下面列出了怎么用io.reactivex.MaybeObserver的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava-RxLife   文件: MaybeLife.java
private void subscribeActual(MaybeObserver<? super T> observer) {
    Maybe<T> upStream = this.upStream;
    if (onMain) {
        upStream = upStream.observeOn(AndroidSchedulers.mainThread());
    }
    upStream.onTerminateDetach().subscribe(new LifeMaybeObserver<>(observer, scope));
}
 
源代码2 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
@Test
public void testElementAtObservable() {

    Observable.just(1, 2, 3, 4, 5, 6)
            .elementAt(1)
            .subscribe(new MaybeObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    System.out.println("onNext: " + integer);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya", "pineapple", "apple", "apple" };
	Observable.fromArray(fruits).elementAt(3).count()
			.subscribe(item -> System.out.println("we got: " + item + " items from the Observable"));
	Observable.fromArray(fruits).elementAt(10).subscribe(new MaybeObserver<String>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("successfully completed");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			System.out.println(throwable.getMessage());

		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub

		}

		@Override
		public void onSuccess(String value) {
			// TODO Auto-generated method stub
			System.out.println("value at specified position is:-"+value);

		}
	});
}
 
private MaybeObserver<Integer> getObserver() {
    return new MaybeObserver<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, " onSubscribe : " + d.isDisposed());
        }

        @Override
        public void onSuccess(Integer value) {
            textView.append(" onSuccess : value : " + value);
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onSuccess : value : " + value);
        }

        @Override
        public void onError(Throwable e) {
            textView.append(" onError : " + e.getMessage());
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onError : " + e.getMessage());
        }

        @Override
        public void onComplete() {
            textView.append(" onComplete");
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onComplete");
        }
    };
}
 
源代码5 项目: resilience4j   文件: MaybeCircuitBreaker.java
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
    if (circuitBreaker.tryAcquirePermission()) {
        upstream.subscribe(new CircuitBreakerMaybeObserver(downstream));
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(createCallNotPermittedException(circuitBreaker));
    }
}
 
源代码6 项目: resilience4j   文件: MaybeBulkhead.java
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
    if (bulkhead.tryAcquirePermission()) {
        upstream.subscribe(new BulkheadMaybeObserver(downstream));
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
    }
}
 
源代码7 项目: resilience4j   文件: MaybeRateLimiter.java
@Override
protected void subscribeActual(MaybeObserver<? super T> downstream) {
    long waitDuration = rateLimiter.reservePermission();
    if (waitDuration >= 0) {
        if (waitDuration > 0) {
            Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
                .subscribe(() -> upstream.subscribe(new RateLimiterMaybeObserver(downstream)));
        } else {
            upstream.subscribe(new RateLimiterMaybeObserver(downstream));
        }
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
    }
}
 
源代码8 项目: vertx-rx   文件: MaybeHelper.java
/**
 * Adapts an Vert.x {@code Handler<AsyncResult<T>>} to an RxJava2 {@link MaybeObserver}.
 * <p>
 * The returned observer can be subscribed to an {@link Maybe#subscribe(MaybeObserver)}.
 *
 * @param handler the handler to adapt
 * @return the observer
 */
public static <T> MaybeObserver<T> toObserver(Handler<AsyncResult<T>> handler) {
  AtomicBoolean completed = new AtomicBoolean();
  return new MaybeObserver<T>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
    }
    @Override
    public void onComplete() {
      if (completed.compareAndSet(false, true)) {
        handler.handle(io.vertx.core.Future.succeededFuture());
      }
    }
    @Override
    public void onSuccess(@NonNull T item) {
      if (completed.compareAndSet(false, true)) {
        handler.handle(io.vertx.core.Future.succeededFuture(item));
      }
    }
    @Override
    public void onError(Throwable error) {
      if (completed.compareAndSet(false, true)) {
        handler.handle(io.vertx.core.Future.failedFuture(error));
      }
    }
  };
}
 
源代码9 项目: vertx-rx   文件: HelperTest.java
@Test
public void testToMaybeObserverSuccess() {
  Promise<String> promise = Promise.promise();
  MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
  Maybe<String> s = Maybe.just("foobar");
  s.subscribe(observer);
  assertTrue(promise.future().succeeded());
  assertSame("foobar", promise.future().result());
}
 
源代码10 项目: vertx-rx   文件: HelperTest.java
@Test
public void testToMaybeObserverEmpty() {
  Promise<String> promise = Promise.promise();
  MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
  Maybe<String> s = Maybe.empty();
  s.subscribe(observer);
  assertTrue(promise.future().succeeded());
  assertNull(promise.future().result());
}
 
源代码11 项目: vertx-rx   文件: HelperTest.java
@Test
public void testToMaybeObserverFailure() {
  Promise<String> promise = Promise.promise();
  MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
  RuntimeException cause = new RuntimeException();
  Maybe<String> s = Maybe.error(cause);
  s.subscribe(observer);
  assertTrue(promise.future().failed());
  assertSame(cause, promise.future().cause());
}
 
源代码12 项目: rxjava-RxLife   文件: LifeMaybeObserver.java
LifeMaybeObserver(MaybeObserver<? super T> downstream, Scope scope) {
    super(scope);
    this.downstream = downstream;
}
 
public static void main(String[] args) {
	Integer[] numbers = { 1, 2, 13, 34, 12, 10 };
	Observable<Integer> source1 = Observable.fromArray(numbers);

	source1.reduce(new BiFunction<Integer, Integer, Integer>() {

		@Override
		public Integer apply(Integer value1, Integer value2) throws Exception {
			// TODO Auto-generated method stub
			// 1, 2, 13, 34, 12, 10
			int sum = 0;
			return value1 + value2;
		}
	}).subscribe(new MaybeObserver<Integer>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("completed2");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			System.out.println(throwable.getMessage());

		}

		@Override
		public void onSubscribe(Disposable arg0) {
			// TODO Auto-generated method stub

		}

		@Override
		public void onSuccess(Integer value) {
			// TODO Auto-generated method stub
			System.out.println(value);

		}
	});

	
               
}
 
源代码14 项目: retrocache   文件: MaybeThrowingTest.java
ForwardingObserver(MaybeObserver<T> delegate) {
    this.delegate = delegate;
}
 
源代码15 项目: resilience4j   文件: AbstractMaybeObserver.java
public AbstractMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
    this.downstreamObserver = requireNonNull(downstreamObserver);
}
 
源代码16 项目: resilience4j   文件: MaybeCircuitBreaker.java
CircuitBreakerMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
    super(downstreamObserver);
    this.start = System.nanoTime();
}
 
源代码17 项目: resilience4j   文件: MaybeBulkhead.java
BulkheadMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
    super(downstreamObserver);
}
 
源代码18 项目: resilience4j   文件: MaybeRateLimiter.java
RateLimiterMaybeObserver(MaybeObserver<? super T> downstreamObserver) {
    super(downstreamObserver);
}
 
源代码19 项目: armeria   文件: RequestContextCallableMaybe.java
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
    try (SafeCloseable ignored = assemblyContext.push()) {
        source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
    }
}
 
源代码20 项目: armeria   文件: RequestContextMaybeObserver.java
RequestContextMaybeObserver(MaybeObserver<T> actual, RequestContext assemblyContext) {
    this.actual = actual;
    this.assemblyContext = assemblyContext;
}
 
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
    try (SafeCloseable ignored = assemblyContext.push()) {
        source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
    }
}
 
源代码22 项目: armeria   文件: RequestContextMaybe.java
@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
    try (SafeCloseable ignored = assemblyContext.push()) {
        source.subscribe(new RequestContextMaybeObserver<>(s, assemblyContext));
    }
}
 
源代码23 项目: brave   文件: TraceContextMaybeObserver.java
TraceContextMaybeObserver(
  MaybeObserver<T> downstream, CurrentTraceContext contextScoper, TraceContext assembled) {
  this.downstream = downstream;
  this.contextScoper = contextScoper;
  this.assembled = assembled;
}
 
源代码24 项目: brave   文件: Wrappers.java
public static <T> MaybeObserver<T> wrap(MaybeObserver<T> downstream,
  CurrentTraceContext contextScoper, TraceContext assembled) {
  return new TraceContextMaybeObserver<>(downstream, contextScoper, assembled);
}
 
@Override protected void subscribeActual(MaybeObserver<? super Integer> o) {
  delegate.subscribe(o);
}
 
@Override protected void subscribeActual(MaybeObserver<? super Integer> o) {
  delegate.subscribe(o);
}
 
源代码27 项目: brave   文件: TraceContextCallableMaybe.java
/**
 * Wraps the observer so that its callbacks run in the assembly context. This does not affect any
 * subscription callbacks.
 */
@Override protected void subscribeActual(MaybeObserver<? super T> o) {
  source.subscribe(Wrappers.wrap(o, contextScoper, assembled));
}
 
源代码28 项目: brave   文件: TraceContextMaybe.java
/**
 * Wraps the observer so that its callbacks run in the assembly context. This does not affect any
 * subscription callbacks.
 */
@Override protected void subscribeActual(MaybeObserver<? super T> o) {
  source.subscribe(new TraceContextMaybeObserver<>(o, contextScoper, assembled));
}
 
 类所在包
 类方法
 同包方法