类io.reactivex.rxjava3.processors.PublishProcessor源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.processors.PublishProcessor的API类实例代码及写法,或者点击链接到github查看源代码。

@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");
  subscriber1.cancel();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void valueMissedWhenNoSubscribers() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();
  subscriber1.cancel();

  subject.onNext("Foo");
  subscriber1.assertNoValues();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();
}
 
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  flowable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    flowable.subscribe(brokenAction);
    fail();
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
 
@Test public void backpressureHonoredWhenCached() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  subscriber2.request(1); // ...and ensure new requests see it.
  subscriber2.assertValues("Bar");
}
 
@Test public void streamsDoNotShareInstances() {
  PublishProcessor<String> subjectA = PublishProcessor.create();
  Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA1);

  PublishProcessor<String> subjectB = PublishProcessor.create();
  Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB1);

  subjectA.onNext("Foo");
  subscriberA1.assertValues("Foo");
  subjectB.onNext("Bar");
  subscriberB1.assertValues("Bar");

  TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA2);
  subscriberA2.assertValues("Foo");

  TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB2);
  subscriberB2.assertValues("Bar");
}
 
@Test public void completeClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void errorClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test public void noInitialValue() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber = new TestSubscriber<>();
  flowable.subscribe(subscriber);
  subscriber.assertNoValues();
}
 
@Test public void initialValueToNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void refCountToUpstream() {
  PublishProcessor<String> subject = PublishProcessor.create();

  final AtomicInteger count = new AtomicInteger();
  Flowable<String> flowable = subject //
      .doOnSubscribe(new Consumer<Subscription>() {
        @Override public void accept(Subscription subscription) {
          count.incrementAndGet();
        }
      }) //
      .doOnCancel(new Action() {
        @Override public void run() {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  disposable1.cancel();
  assertEquals(1, count.get());

  disposable3.cancel();
  assertEquals(1, count.get());

  disposable2.cancel();
  assertEquals(0, count.get());
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestSubscriber<String> testSubscriber = new TestSubscriber<>();
  testSubscriber.cancel();
  replayed.subscribe(testSubscriber);
  testSubscriber.assertNoValues();
}
 
@Test public void defaultValueOnSubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");
}
 
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  flowable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
源代码16 项目: akarnokd-misc   文件: UndeliverableTest.java
@Test
public void test() {
    RxJavaPlugins.setErrorHandler(error -> System.out.println(error));

    PublishProcessor<Integer> main = PublishProcessor.create();
    PublishProcessor<Integer> inner = PublishProcessor.create();

    // switchMapDelayError will delay all errors
    TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();

    main.onNext(1);

    // the inner fails
    inner.onError(new IOException());

    // the consumer is still clueless
    ts.assertEmpty();

    // the consumer cancels
    ts.cancel();

}
 
@Before
public void setup() {
    stream = PublishProcessor.create();
    lifecycle = PublishSubject.create();
}
 
@Before
public void setup() {
    stream = PublishProcessor.create();
    lifecycle = PublishSubject.create();
}
 
@Before
public void setup() {
    stream = PublishProcessor.create();
    lifecycle = PublishSubject.create();
}
 
源代码20 项目: resilience4j   文件: RxJava3Adapter.java
/**
 * Converts the EventPublisher into a Flowable.
 *
 * @param eventPublisher the event publisher
 * @param <T>            the type of the event
 * @return the Flowable
 */
public static <T> Flowable<T> toFlowable(EventPublisher<T> eventPublisher) {
    PublishProcessor<T> publishProcessor = PublishProcessor.create();
    FlowableProcessor<T> flowableProcessor = publishProcessor.toSerialized();
    eventPublisher.onEvent(flowableProcessor::onNext);
    return flowableProcessor;
}
 
 类所在包
 同包方法