下面列出了怎么用io.reactivex.rxjava3.subscribers.TestSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
subscriber1.cancel();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void valueMissedWhenNoSubscribers() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subscriber1.cancel();
subject.onNext("Foo");
subscriber1.assertNoValues();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
}
@Test public void backpressureHonoredWhenCached() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
subscriber2.request(1); // ...and ensure new requests see it.
subscriber2.assertValues("Bar");
}
@Test public void streamsDoNotShareInstances() {
PublishProcessor<String> subjectA = PublishProcessor.create();
Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
flowableA.subscribe(subscriberA1);
PublishProcessor<String> subjectB = PublishProcessor.create();
Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
flowableB.subscribe(subscriberB1);
subjectA.onNext("Foo");
subscriberA1.assertValues("Foo");
subjectB.onNext("Bar");
subscriberB1.assertValues("Bar");
TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
flowableA.subscribe(subscriberA2);
subscriberA2.assertValues("Foo");
TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
flowableB.subscribe(subscriberB2);
subscriberB2.assertValues("Bar");
}
@Test public void completeClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void errorClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void doNotTimeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
.take(2)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertValueCount(2)
.assertComplete();
then(timeLimiter).should(times(3))
.onSuccess();
}
@Test
public void timeoutAfterInitial() throws InterruptedException {
int timeout = 2;
int initialDelay = 1;
int periodDelay = 3;
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofSeconds(timeout)));
TestSubscriber<Long> subscriber = Flowable
.interval(initialDelay, periodDelay, TimeUnit.SECONDS)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.await()
.assertValueCount(1)
.assertError(TimeoutException.class);
then(timeLimiter).should()
.onSuccess();
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx3/flowable/textJson";
List<Object> providers = new LinkedList<>();
providers.add(new JacksonJsonProvider());
providers.add(new FlowableRxInvokerProvider());
WebClient wc = WebClient.create(address, providers);
Flowable<HelloWorldBean> obs = wc.accept("application/json")
.rx(FlowableRxInvoker.class)
.get(HelloWorldBean.class);
final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
obs.subscribe(subscriber);
subscriber.await(3, TimeUnit.SECONDS);
subscriber.assertResult(new HelloWorldBean("Hello", "World"));
}
@Test public void noInitialValue() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber = new TestSubscriber<>();
flowable.subscribe(subscriber);
subscriber.assertNoValues();
}
@Test public void initialValueToNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void refCountToUpstream() {
PublishProcessor<String> subject = PublishProcessor.create();
final AtomicInteger count = new AtomicInteger();
Flowable<String> flowable = subject //
.doOnSubscribe(new Consumer<Subscription>() {
@Override public void accept(Subscription subscription) {
count.incrementAndGet();
}
}) //
.doOnCancel(new Action() {
@Override public void run() {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
disposable1.cancel();
assertEquals(1, count.get());
disposable3.cancel();
assertEquals(1, count.get());
disposable2.cancel();
assertEquals(0, count.get());
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.cancel();
replayed.subscribe(testSubscriber);
testSubscriber.assertNoValues();
}
@Test public void defaultValueOnSubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
}
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertValues("default");
subject.onNext("Foo");
subscriber1.assertValues("default", "Foo");
TestSubscriber<String> observer2 = new TestSubscriber<>();
flowable.subscribe(observer2);
observer2.assertValues("Foo");
}
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void otherError() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestSubscriber<Object> subscriber = Flowable.error(new RuntimeException())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertError(RuntimeException.class);
then(timeLimiter).should()
.onError(any(RuntimeException.class));
}
@Test
public void timeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.MINUTES)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void timeoutEmpty() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestSubscriber<Object> subscriber = Flowable.empty()
.delay(1, TimeUnit.MINUTES)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void doNotTimeoutEmpty() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestSubscriber<Object> subscriber = Flowable.empty()
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
subscriber.assertComplete();
then(timeLimiter).should()
.onSuccess();
}
@Test
public void noEvents() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
stream.onNext("1");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void oneStartEvent() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
lifecycle.onNext("create");
stream.onNext("1");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void twoOpenEvents() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
lifecycle.onNext("create");
stream.onNext("1");
lifecycle.onNext("start");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void openAndCloseEvent() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
lifecycle.onNext("create");
stream.onNext("1");
lifecycle.onNext("destroy");
stream.onNext("2");
testSubscriber.assertValues("1");
testSubscriber.assertComplete();
}
@Test
public void noEvent() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle))
.test();
stream.onNext("1");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void oneEvent() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bind(lifecycle))
.test();
stream.onNext("1");
lifecycle.onNext("stop");
stream.onNext("2");
testSubscriber.assertValues("1");
testSubscriber.assertComplete();
}
@Test
public void noEvents() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
.test();
stream.onNext("1");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void oneWrongEvent() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
.test();
stream.onNext("1");
lifecycle.onNext("keep going");
stream.onNext("2");
testSubscriber.assertValues("1", "2");
testSubscriber.assertNotComplete();
}
@Test
public void twoEvents() {
TestSubscriber<String> testSubscriber = stream
.compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
.test();
stream.onNext("1");
lifecycle.onNext("keep going");
stream.onNext("2");
lifecycle.onNext("stop");
stream.onNext("3");
testSubscriber.assertValues("1", "2");
testSubscriber.assertComplete();
}