下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Observer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ReportEntity value) {
flush(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
start(d);
}
@Override
public void onNext(ReportEntity value) {
flush(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
@Override
public Observer<ReportEntity> getReportObserver() {
return new Observer<ReportEntity>() {
@Override
public void onSubscribe(Disposable d) {
disp = d;
}
@Override
public void onNext(ReportEntity value) {
entity = value;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
private static <T>Observer<T> observer(final String name, final List<T> result) {
return new Observer<T>() {
@Override
public void onSubscribe(final Disposable d) {
}
@Override
public void onNext(final T next) {
logger.fine(name + ": " + next);
result.add(next);
}
@Override
public void onError(final Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
logger.fine(name + ": onComplete");
}
};
}
@Test
public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() {
BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");
Observer<String> observer = TestHelper.mockObserver();
subject.subscribe(observer);
subject.accept("one");
subject.accept("two");
subject.accept("three");
verify(observer, times(1)).onNext("default");
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, Mockito.never()).onComplete();
}
@Test
public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() {
BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");
subject.accept("one");
Observer<String> observer = TestHelper.mockObserver();
subject.subscribe(observer);
subject.accept("two");
subject.accept("three");
verify(observer, Mockito.never()).onNext("default");
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, Mockito.never()).onComplete();
}
@Test
public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() {
BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");
Observer<String> observer = TestHelper.mockObserver();
subject.subscribe(observer);
subject.accept("one");
subject.accept("two");
subject.accept("three");
verify(observer, times(1)).onNext("default");
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, Mockito.never()).onComplete();
}
@Test
public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() {
BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");
subject.accept("one");
Observer<String> observer = TestHelper.mockObserver();
subject.subscribe(observer);
subject.accept("two");
subject.accept("three");
verify(observer, Mockito.never()).onNext("default");
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, Mockito.never()).onComplete();
}
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
return NULL;
if (!isTracingEnabled) {
isTracingEnabled = true;
TracingRxJava3Utils.enableTracing();
}
if (arg0 instanceof Observer)
return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());
if (!(arg0 instanceof Consumer))
return NULL;
final TracingConsumer<Object> tracingConsumer;
if (argc == 1)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
else if (argc == 2)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
else if (argc == 3)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
else
tracingConsumer = null;
if (tracingConsumer != null)
((Observable<Object>)thiz).subscribe(tracingConsumer);
return null;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
observer.onSubscribe(bs);
add(bs);
if (bs.cancelled) {
remove(bs);
} else {
bs.emitFirst();
}
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ReplayDisposable<T> rs = new ReplayDisposable<T>(observer, this);
observer.onSubscribe(rs);
if (!rs.cancelled) {
if (add(rs)) {
if (rs.cancelled) {
remove(rs);
return;
}
}
buffer.replay(rs);
}
}
@Override
protected void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
t.onSubscribe(ps);
add(ps);
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
}
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
@Test
public void testTakeOneSubscriber() {
BehaviorRelay<Integer> source = BehaviorRelay.createDefault(1);
final Observer<Object> o = TestHelper.mockObserver();
source.take(1).subscribe(o);
verify(o).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
assertEquals(0, source.subscriberCount());
assertFalse(source.hasObservers());
}
@Test
public void innerDisposed() {
BehaviorRelay.create()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* Should be able to unsubscribe all Subscribers, have it stop emitting, then subscribe new ones and it start emitting again.
*/
@Test
public void testReSubscribe() {
final PublishRelay<Integer> ps = PublishRelay.create();
Observer<Integer> o1 = TestHelper.mockObserver();
TestObserver<Integer> ts = new TestObserver<Integer>(o1);
ps.subscribe(ts);
// emit
ps.accept(1);
// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();
// unsubscribe
ts.dispose();
// emit again but nothing will be there to receive it
ps.accept(2);
Observer<Integer> o2 = TestHelper.mockObserver();
TestObserver<Integer> ts2 = new TestObserver<Integer>(o2);
ps.subscribe(ts2);
// emit
ps.accept(3);
// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();
ts2.dispose();
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
observer.onSubscribe(bs);
add(bs);
if (bs.cancelled) {
remove(bs);
} else {
bs.emitFirst();
}
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ReplayDisposable<T> rs = new ReplayDisposable<T>(observer, this);
observer.onSubscribe(rs);
if (!rs.cancelled) {
if (add(rs)) {
if (rs.cancelled) {
remove(rs);
return;
}
}
buffer.replay(rs);
}
}
@Override
protected void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
t.onSubscribe(ps);
add(ps);
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
}
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
@Test
public void testTakeOneSubscriber() {
BehaviorRelay<Integer> source = BehaviorRelay.createDefault(1);
final Observer<Object> o = TestHelper.mockObserver();
source.take(1).subscribe(o);
verify(o).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
assertEquals(0, source.subscriberCount());
assertFalse(source.hasObservers());
}
@Test
public void innerDisposed() {
BehaviorRelay.create()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* Should be able to unsubscribe all Subscribers, have it stop emitting, then subscribe new ones and it start emitting again.
*/
@Test
public void testReSubscribe() {
final PublishRelay<Integer> ps = PublishRelay.create();
Observer<Integer> o1 = TestHelper.mockObserver();
TestObserver<Integer> ts = new TestObserver<Integer>(o1);
ps.subscribe(ts);
// emit
ps.accept(1);
// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();
// unsubscribe
ts.dispose();
// emit again but nothing will be there to receive it
ps.accept(2);
Observer<Integer> o2 = TestHelper.mockObserver();
TestObserver<Integer> ts2 = new TestObserver<Integer>(o2);
ps.subscribe(ts2);
// emit
ps.accept(3);
// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();
ts2.dispose();
}
@Override
protected void subscribeActual(Observer<? super T> downstream) {
if (circuitBreaker.tryAcquirePermission()) {
upstream.subscribe(new CircuitBreakerObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createCallNotPermittedException(circuitBreaker));
}
}
@Override
protected void subscribeActual(Observer<? super T> downstream) {
if (bulkhead.tryAcquirePermission()) {
upstream.subscribe(new BulkheadObserver(downstream));
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
}
}
@Override
protected void subscribeActual(Observer<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(createRequestNotPermitted(rateLimiter));
}
}
public void subscribe(Observer<T> observer) {
observable.subscribe(observer);
}
private static void executeSequentialObservable(final String name, final List<Integer> result, final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final Observer<Integer> observer = observer(name, result);
observable.subscribe(observer);
}
@Override
protected void subscribeActual(@NonNull Observer<? super Object> observer) {
source.subscribe(new SubscriberAdapter(observer,EMPTY_DISPOSABLE));
}
private SubscriberAdapter(Observer<? super Object> observer,Disposable disposable) {
this.observer = observer;
this.disposable = disposable;
}