io.reactivex.rxjava3.processors.PublishProcessor#compose ( )源码实例Demo

下面列出了io.reactivex.rxjava3.processors.PublishProcessor#compose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");
  subscriber1.cancel();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void valueMissedWhenNoSubscribers() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();
  subscriber1.cancel();

  subject.onNext("Foo");
  subscriber1.assertNoValues();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();
}
 
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  flowable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    flowable.subscribe(brokenAction);
    fail();
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
 
@Test public void backpressureHonoredWhenCached() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  subscriber2.request(1); // ...and ensure new requests see it.
  subscriber2.assertValues("Bar");
}
 
@Test public void streamsDoNotShareInstances() {
  PublishProcessor<String> subjectA = PublishProcessor.create();
  Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA1);

  PublishProcessor<String> subjectB = PublishProcessor.create();
  Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB1);

  subjectA.onNext("Foo");
  subscriberA1.assertValues("Foo");
  subjectB.onNext("Bar");
  subscriberB1.assertValues("Bar");

  TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA2);
  subscriberA2.assertValues("Foo");

  TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB2);
  subscriberB2.assertValues("Bar");
}
 
@Test public void noInitialValue() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber = new TestSubscriber<>();
  flowable.subscribe(subscriber);
  subscriber.assertNoValues();
}
 
@Test public void initialValueToNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestSubscriber<String> testSubscriber = new TestSubscriber<>();
  testSubscriber.cancel();
  replayed.subscribe(testSubscriber);
  testSubscriber.assertNoValues();
}
 
@Test public void defaultValueOnSubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");
}
 
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  flowable.subscribe(observer2);
  observer2.assertValues("Foo");
}