下面列出了io.reactivex.rxjava3.processors.PublishProcessor#onError ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test public void errorClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void test() {
RxJavaPlugins.setErrorHandler(error -> System.out.println(error));
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> inner = PublishProcessor.create();
// switchMapDelayError will delay all errors
TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();
main.onNext(1);
// the inner fails
inner.onError(new IOException());
// the consumer is still clueless
ts.assertEmpty();
// the consumer cancels
ts.cancel();
}