下面列出了怎么用io.reactivex.subscribers.TestSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void pubsub() throws Exception {
PersonRepository repository = new PersonRepository(new GeodeBackend(GeodeSetup.of(x -> region)));
TestSubscriber<WatchEvent<Person>> events = Flowable.fromPublisher(repository.watcher(PersonCriteria.person).watch())
.test();
final PersonGenerator generator = new PersonGenerator();
final int count = 4;
for (int i = 0; i < count; i++) {
repository.insert(generator.next().withId("id" + i));
}
check(region.keySet()).notEmpty();
// ensure (de)serialization is successful
check(region.query("true")).hasSize(count);
events.awaitCount(count);
events.assertNoErrors();
events.assertValueCount(count);
check(events.values().stream().map(e -> e.newValue().get().id()).collect(Collectors.toList())).hasContentInAnyOrder("id0", "id1", "id2", "id3");
}
@Test
public void doNotTimeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
.take(2)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertValueCount(2)
.assertComplete();
then(timeLimiter).should(times(3))
.onSuccess();
}
@Test
public void observeChangesAndNotifyAboutChangesShouldWorkCorrectly() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("test_table", "tag");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValue(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void testWithScheduler() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
// next wait is 2 seconds so advancing by 1 should do nothing
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2, 1, 2);
ts.assertError(ex);
}
@Test
public void applyForTablesAndTags_shouldNotifyByTable() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTablesAndTags(
Flowable.just(Changes.newInstance("table1", "another_tag"),
Changes.newInstance("table2", "tag2"),
Changes.newInstance("table3")),
singleton("table1"),
singleton("tag1"))
.subscribe(testSubscriber);
// All other Changes should be filtered
testSubscriber.assertValue(Changes.newInstance("table1", "another_tag"));
testSubscriber.dispose();
}
@Test
public void testInsertWithDelays() {
TestScheduler s = new TestScheduler();
TestSubscriber<Integer> ts = //
Flowable.just(1).delay(1, TimeUnit.SECONDS, s) //
.concatWith(Flowable.just(2).delay(3, TimeUnit.SECONDS, s)) //
.compose(Transformers.insert(Maybe.just(3).delay(2, TimeUnit.SECONDS, s))) //
.test();
ts.assertNoValues();
s.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1);
s.advanceTimeBy(2, TimeUnit.SECONDS);
ts.assertValues(1, 3);
s.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1, 3, 2);
ts.assertComplete();
}
@Test
public void applyForTables_shouldFilterRequiredTable() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTables(
Flowable.just(Changes.newInstance("table1"),
Changes.newInstance("table2"),
Changes.newInstance("table3")),
singleton("table2"))
.subscribe(testSubscriber);
// All other tables should be filtered
testSubscriber.assertValue(Changes.newInstance("table2"));
testSubscriber.dispose();
}
@Test
public void onAppOpen_whenAnalyticsAbsent_notifiesSubscriber() {
TestUniversalComponent analyticsLessUniversalComponent =
universalComponentBuilder
.appMeasurementModule(new AppMeasurementModule(null, firebaseEventSubscriber))
.build();
TestAppComponent appComponent =
appComponentBuilder.universalComponent(analyticsLessUniversalComponent).build();
FirebaseInAppMessaging instance = appComponent.providesFirebaseInAppMessaging();
TestSubscriber<InAppMessage> subscriber = listenerToFlowable(instance);
simulateAppResume();
waitUntilNotified(subscriber);
assertSingleSuccessNotification(subscriber);
}
@Test
public void repeatsOperationWithRawQueryByChangeOfTag() {
putUserBlocking();
TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
storIOSQLite
.get()
.cursor()
.withQuery(rawQuery)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);
testSubscriber.assertValueCount(2);
}
@Test
public void testMaxIdleTime() throws InterruptedException {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(3) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
pool.member()) //
.doOnNext(m -> m.checkin()) //
.doOnNext(System.out::println) //
.doOnRequest(t -> System.out.println("test request=" + t)) //
.test(1);
s.triggerActions();
ts.assertValueCount(1);
assertEquals(0, disposed.get());
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(1, disposed.get());
}
@Test
public void shouldReturnValueOfSupplier() throws Throwable {
given(cache.get("testKey")).willReturn(null);
willThrow(new RuntimeException("Cache is not available")).given(cache)
.put("testKey", "Hello world");
Cache<String, String> cacheContext = Cache.of(cache);
TestSubscriber<CacheEvent.Type> testSubscriber =
toFlowable(cacheContext.getEventPublisher())
.map(CacheEvent::getEventType)
.test();
CheckedFunction1<String, String> cachedFunction = Cache
.decorateCheckedSupplier(cacheContext, () -> "Hello world");
String value = cachedFunction.apply("testKey");
assertThat(value).isEqualTo("Hello world");
assertThat(cacheContext.getMetrics().getNumberOfCacheHits()).isEqualTo(0);
assertThat(cacheContext.getMetrics().getNumberOfCacheMisses()).isEqualTo(1);
testSubscriber
.assertValueCount(2)
.assertValues(CacheEvent.Type.CACHE_MISS, CacheEvent.Type.ERROR);
}
@Test
public void testFindAll() throws TechnicalException {
// create tag
Tag tag = new Tag();
tag.setName("testName");
tag.setDescription("Description");
tag.setOrganizationId(ORGANIZATION_ID);
tagRepository.create(tag).blockingGet();
// fetch domains
TestSubscriber<Tag> testObserver1 = tagRepository.findAll(ORGANIZATION_ID).test();
testObserver1.awaitTerminalEvent();
testObserver1.assertComplete();
testObserver1.assertNoErrors();
testObserver1.assertValueCount(1);
}
@Test
public void testConnectionPoolRecylesAlternating() {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(2) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.scheduler(s) //
.build();
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
.repeat() //
.doOnNext(m -> m.checkin()) //
.map(m -> m.value()) //
.test(4); //
s.triggerActions();
ts.assertValueCount(4) //
.assertNotTerminated();
List<Object> list = ts.getEvents().get(0);
// all 4 connections released were the same
assertTrue(list.get(0) == list.get(1));
assertTrue(list.get(1) == list.get(2));
assertTrue(list.get(2) == list.get(3));
}
@Test
public void queryOneExistedObjectFlowable() {
final List<User> users = putUsersBlocking(3);
final User expectedUser = users.get(0);
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs(expectedUser.email())
.build())
.prepare()
.asRxFlowable(LATEST)
.take(1);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.of(expectedUser));
}
@Test
public void applyForTablesAndTags_shouldSendJustOnceNotificationIfBothTableAndTagAreSatisfy() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTablesAndTags(
Flowable.just(Changes.newInstance("target_table", "target_tag")),
singleton("target_table"),
singleton("target_tag"))
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
testSubscriber.assertValue(Changes.newInstance("target_table", "target_tag"));
testSubscriber.dispose();
}
@Test
public void storeImpression_writeErrors_doesNotSetInMemoryCache() {
CampaignImpressionList otherCampaignImpressionList =
CampaignImpressionList.getDefaultInstance();
when(storageClient.write(any(CampaignImpressionList.class)))
.thenReturn(Completable.error(new IOException()));
when(storageClient.read(any(CampaignImpressionsParser.class))).thenReturn(Maybe.empty());
impressionStorageClient.storeImpression(campaignImpression).subscribe();
when(storageClient.read(any(CampaignImpressionsParser.class)))
.thenReturn(Maybe.just(otherCampaignImpressionList));
TestSubscriber<CampaignImpressionList> subscriber =
impressionStorageClient.getAllImpressions().toFlowable().test();
assertThat(getPlainValues(subscriber)).containsExactly(otherCampaignImpressionList);
}
@Test
public void testCommand_callback_async() {
processor.attach(session);
int cnt = 100;
List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
for (int j = 0; j < cnt; j++) {
List<String> cmds = new ArrayList<>();
for (int i = 0; i < 10; i++) cmds.add("echo " + i);
cmds.add("echo " + j);
PublishProcessor<String> outputListener = PublishProcessor.create();
TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
}
for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
}
}
@Test
public void chunkOperatorCorrectlyChunksInfiniteRequest() {
int chunkSize = DEFAULT_CHUNK_SIZE;
int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
int num = chunkSize * 2;
AbstractStreamObserverAndPublisher<Long> source =
new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
assertThat(source.outputFused).isFalse();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribe(subscriber);
set.add("1");
set.add("1");
// when:
final Iterator<String> it = set.iterator();
it.next();
it.remove();
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new RemoveCommandMatcher<>(set.getCrdtId(), "1", "1")
));
}
@Test
public void repeatsOperationWithQueryByChangeOfTag() {
putUserBlocking();
TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
storIOSQLite
.get()
.cursor()
.withQuery(query)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);
testSubscriber.assertValueCount(2);
}
@Test
public void clearImpressions_doesNotClearImpressionForUnfetchedCampaign() {
// verify initial campaign is impressed.
TestSubscriber<Boolean> subscriber =
impressionStorageClient.isImpressed(vanillaCampaign).toFlowable().test();
assertThat(subscriber.getEvents().get(0)).containsExactly(true);
// clear impressions for a fetch response containing a different campaign.
// This simulates having received the campaign again from the server.
impressionStorageClient
.clearImpressions(
FetchEligibleCampaignsResponse.newBuilder().addMessages(experimentalCampaign).build())
.subscribe();
// Verify campaign is still impressed.
TestSubscriber<Boolean> subscriber2 =
impressionStorageClient.isImpressed(vanillaCampaign).toFlowable().test();
assertThat(subscriber2.getEvents().get(0)).containsExactly(true);
}
@Test
public void manyToManyRetryImmediately() {
TestSubscriber<Integer> test = newThreeErrorFlowable()
.compose(GrpcRetry.ManyToMany.retryImmediately(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> flowable) {
return flowable;
}
}))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void timeoutAfterInitial() throws InterruptedException {
int timeout = 2;
int initialDelay = 1;
int periodDelay = 3;
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofSeconds(timeout)));
TestSubscriber<Long> subscriber = Flowable
.interval(initialDelay, periodDelay, TimeUnit.SECONDS)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.await()
.assertValueCount(1)
.assertError(TimeoutException.class);
then(timeLimiter).should()
.onSuccess();
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx2/flowable/textJson";
List<Object> providers = new LinkedList<>();
providers.add(new JacksonJsonProvider());
providers.add(new FlowableRxInvokerProvider());
WebClient wc = WebClient.create(address, providers);
Flowable<HelloWorldBean> obs = wc.accept("application/json")
.rx(FlowableRxInvoker.class)
.get(HelloWorldBean.class);
final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
obs.subscribe(subscriber);
subscriber.await(3, TimeUnit.SECONDS);
subscriber.assertResult(new HelloWorldBean("Hello", "World"));
}
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@Test
public void statusExceptionTriggersHandler() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
}) { };
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.test();
StatusException exception = Status.CANCELLED.asException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isFalse();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2"),
new AddCommandMatcher<>(set.getCrdtId(), "1")
));
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2"),
new AddCommandMatcher<>(set.getCrdtId(), "1")
));
}
@Test
public void testProcessCompletion_linereaders_dont_terminate_early() throws IOException {
RxShell rxShell = new RxShell(rxProcess);
RxShell.Session session = rxShell.open().test().awaitCount(1).assertNoTimeout().values().get(0);
TestSubscriber<String> outputObs = session.outputLines().test();
TestSubscriber<String> errorObs = session.errorLines().test();
session.close().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(0);
await().atMost(1, TimeUnit.SECONDS).until(() -> cmdStream.isOpen(), is(false));
await().pollDelay(1, TimeUnit.SECONDS).until(outputObs::isTerminated, is(false));
await().pollDelay(1, TimeUnit.SECONDS).until(errorObs::isTerminated, is(false));
await().atMost(1, TimeUnit.SECONDS).until(() -> outputStream.isOpen(), is(true));
await().atMost(1, TimeUnit.SECONDS).until(() -> errorStream.isOpen(), is(true));
outputStream.close();
errorStream.close();
await().atMost(1, TimeUnit.SECONDS).until(() -> outputStream.isOpen(), is(false));
await().atMost(1, TimeUnit.SECONDS).until(() -> errorStream.isOpen(), is(false));
await().atMost(1, TimeUnit.SECONDS).until(outputObs::isTerminated, is(true));
await().atMost(1, TimeUnit.SECONDS).until(errorObs::isTerminated, is(true));
}
@Test
public void getSuggestions_ValuesNotEmpty() {
Flowable<SimpleSuggestionItem> suggestions = suggestionSource.getSuggestions("Hello");
if (Util.isOnline()) {
final TestSubscriber<SimpleSuggestionItem> suggestionItemTestSubscriber = suggestions.test();
suggestionItemTestSubscriber
.assertNoErrors()
.assertComplete();
final List<List<Object>> events = suggestionItemTestSubscriber.getEvents();
final List<Object> onNextEvents = events.get(0);
for (Object suggestionItem : onNextEvents) {
Assert.assertTrue(suggestionItem instanceof SimpleSuggestionItem);
Assert.assertNotNull(((SimpleSuggestionItem) suggestionItem).value());
Assert.assertTrue(!TextUtils.isEmpty(((SimpleSuggestionItem) suggestionItem).value()));
}
} else {
testFailureNoEvents(suggestions);
}
}