io.reactivex.rxjava3.processors.PublishProcessor#onError ( )源码实例Demo

下面列出了io.reactivex.rxjava3.processors.PublishProcessor#onError ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test public void errorClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
源代码3 项目: akarnokd-misc   文件: UndeliverableTest.java
@Test
public void test() {
    RxJavaPlugins.setErrorHandler(error -> System.out.println(error));

    PublishProcessor<Integer> main = PublishProcessor.create();
    PublishProcessor<Integer> inner = PublishProcessor.create();

    // switchMapDelayError will delay all errors
    TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();

    main.onNext(1);

    // the inner fails
    inner.onError(new IOException());

    // the consumer is still clueless
    ts.assertEmpty();

    // the consumer cancels
    ts.cancel();

}