下面列出了怎么用io.reactivex.rxjava3.processors.PublishProcessor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
subscriber1.cancel();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void valueMissedWhenNoSubscribers() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subscriber1.cancel();
subject.onNext("Foo");
subscriber1.assertNoValues();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
}
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
flowable.subscribe();
subject.onNext("Foo");
Consumer<String> brokenAction = new Consumer<String>() {
@Override public void accept(String s) {
throw new OutOfMemoryError("broken!");
}
};
try {
flowable.subscribe(brokenAction);
fail();
} catch (OutOfMemoryError e) {
assertEquals("broken!", e.getMessage());
}
}
@Test public void backpressureHonoredWhenCached() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
subscriber2.request(1); // ...and ensure new requests see it.
subscriber2.assertValues("Bar");
}
@Test public void streamsDoNotShareInstances() {
PublishProcessor<String> subjectA = PublishProcessor.create();
Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
flowableA.subscribe(subscriberA1);
PublishProcessor<String> subjectB = PublishProcessor.create();
Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
flowableB.subscribe(subscriberB1);
subjectA.onNext("Foo");
subscriberA1.assertValues("Foo");
subjectB.onNext("Bar");
subscriberB1.assertValues("Bar");
TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
flowableA.subscribe(subscriberA2);
subscriberA2.assertValues("Foo");
TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
flowableB.subscribe(subscriberB2);
subscriberB2.assertValues("Bar");
}
@Test public void completeClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void errorClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test public void noInitialValue() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber = new TestSubscriber<>();
flowable.subscribe(subscriber);
subscriber.assertNoValues();
}
@Test public void initialValueToNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void refCountToUpstream() {
PublishProcessor<String> subject = PublishProcessor.create();
final AtomicInteger count = new AtomicInteger();
Flowable<String> flowable = subject //
.doOnSubscribe(new Consumer<Subscription>() {
@Override public void accept(Subscription subscription) {
count.incrementAndGet();
}
}) //
.doOnCancel(new Action() {
@Override public void run() {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
disposable1.cancel();
assertEquals(1, count.get());
disposable3.cancel();
assertEquals(1, count.get());
disposable2.cancel();
assertEquals(0, count.get());
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.cancel();
replayed.subscribe(testSubscriber);
testSubscriber.assertNoValues();
}
@Test public void defaultValueOnSubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
}
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
TestSubscriber<String> observer2 = new TestSubscriber<>();
flowable.subscribe(observer2);
observer2.assertValues("Foo");
}
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void test() {
RxJavaPlugins.setErrorHandler(error -> System.out.println(error));
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> inner = PublishProcessor.create();
// switchMapDelayError will delay all errors
TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();
main.onNext(1);
// the inner fails
inner.onError(new IOException());
// the consumer is still clueless
ts.assertEmpty();
// the consumer cancels
ts.cancel();
}
@Before
public void setup() {
stream = PublishProcessor.create();
lifecycle = PublishSubject.create();
}
@Before
public void setup() {
stream = PublishProcessor.create();
lifecycle = PublishSubject.create();
}
@Before
public void setup() {
stream = PublishProcessor.create();
lifecycle = PublishSubject.create();
}
/**
* Converts the EventPublisher into a Flowable.
*
* @param eventPublisher the event publisher
* @param <T> the type of the event
* @return the Flowable
*/
public static <T> Flowable<T> toFlowable(EventPublisher<T> eventPublisher) {
PublishProcessor<T> publishProcessor = PublishProcessor.create();
FlowableProcessor<T> flowableProcessor = publishProcessor.toSerialized();
eventPublisher.onEvent(flowableProcessor::onNext);
return flowableProcessor;
}