下面列出了io.reactivex.rxjava3.processors.PublishProcessor#compose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
subscriber1.cancel();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void valueMissedWhenNoSubscribers() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subscriber1.cancel();
subject.onNext("Foo");
subscriber1.assertNoValues();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
}
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
flowable.subscribe();
subject.onNext("Foo");
Consumer<String> brokenAction = new Consumer<String>() {
@Override public void accept(String s) {
throw new OutOfMemoryError("broken!");
}
};
try {
flowable.subscribe(brokenAction);
fail();
} catch (OutOfMemoryError e) {
assertEquals("broken!", e.getMessage());
}
}
@Test public void backpressureHonoredWhenCached() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
subscriber2.request(1); // ...and ensure new requests see it.
subscriber2.assertValues("Bar");
}
@Test public void streamsDoNotShareInstances() {
PublishProcessor<String> subjectA = PublishProcessor.create();
Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
flowableA.subscribe(subscriberA1);
PublishProcessor<String> subjectB = PublishProcessor.create();
Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
flowableB.subscribe(subscriberB1);
subjectA.onNext("Foo");
subscriberA1.assertValues("Foo");
subjectB.onNext("Bar");
subscriberB1.assertValues("Bar");
TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
flowableA.subscribe(subscriberA2);
subscriberA2.assertValues("Foo");
TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
flowableB.subscribe(subscriberB2);
subscriberB2.assertValues("Bar");
}
@Test public void noInitialValue() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber = new TestSubscriber<>();
flowable.subscribe(subscriber);
subscriber.assertNoValues();
}
@Test public void initialValueToNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.cancel();
replayed.subscribe(testSubscriber);
testSubscriber.assertNoValues();
}
@Test public void defaultValueOnSubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
}
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
TestSubscriber<String> observer2 = new TestSubscriber<>();
flowable.subscribe(observer2);
observer2.assertValues("Foo");
}