下面列出了io.reactivex.subscribers.TestSubscriber#assertValueCount ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void pubsub() throws Exception {
PersonRepository repository = new PersonRepository(new GeodeBackend(GeodeSetup.of(x -> region)));
TestSubscriber<WatchEvent<Person>> events = Flowable.fromPublisher(repository.watcher(PersonCriteria.person).watch())
.test();
final PersonGenerator generator = new PersonGenerator();
final int count = 4;
for (int i = 0; i < count; i++) {
repository.insert(generator.next().withId("id" + i));
}
check(region.keySet()).notEmpty();
// ensure (de)serialization is successful
check(region.query("true")).hasSize(count);
events.awaitCount(count);
events.assertNoErrors();
events.assertValueCount(count);
check(events.values().stream().map(e -> e.newValue().get().id()).collect(Collectors.toList())).hasContentInAnyOrder("id0", "id1", "id2", "id3");
}
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.take(10)
.test();
// Consume some work
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
subscription.dispose();
subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
subscription.assertValueCount(10);
subscription.assertTerminated();
assertThat(svc.wasCanceled()).isTrue();
errorRule.verifyNoError();
}
@Test
public void testUpdatesChecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValues(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
updatesSubscriber.assertNoErrors();
}
@Test
public void repeatsOperationWithQueryByChangeOfTag() {
putUserBlocking();
TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
storIOSQLite
.get()
.cursor()
.withQuery(query)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);
testSubscriber.assertValueCount(2);
}
@Test
public void test_just_Flowable() {
Flowable<String> observable = Flowable.just("mango", "papaya", "guava");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
observable.subscribe(testSubscriber);
List<String> items = testSubscriber.values();
testSubscriber.assertComplete();
testSubscriber.assertSubscribed();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(3);
testSubscriber.assertValues("mango", "papaya", "guava");
}
@Test
public void testMaxIdleTime() throws InterruptedException {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(3) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
pool.member()) //
.doOnNext(m -> m.checkin()) //
.doOnNext(System.out::println) //
.doOnRequest(t -> System.out.println("test request=" + t)) //
.test(1);
s.triggerActions();
ts.assertValueCount(1);
assertEquals(0, disposed.get());
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(1, disposed.get());
}
@Test
public void repeatsOperationWithRawQueryByChangeOfTable() {
putUserBlocking();
TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
storIOSQLite
.get()
.cursor()
.withQuery(rawQuery)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
storIOSQLite.lowLevel().notifyAboutChanges(tableChanges);
testSubscriber.assertValueCount(2);
}
@Test
public void testFindAll() throws TechnicalException {
// create tag
Tag tag = new Tag();
tag.setName("testName");
tag.setDescription("Description");
tag.setOrganizationId(ORGANIZATION_ID);
tagRepository.create(tag).blockingGet();
// fetch domains
TestSubscriber<Tag> testObserver1 = tagRepository.findAll(ORGANIZATION_ID).test();
testObserver1.awaitTerminalEvent();
testObserver1.assertComplete();
testObserver1.assertNoErrors();
testObserver1.assertValueCount(1);
}
@Test
public void repeatsOperationWithQueryByChangeOfTable() {
putUserBlocking();
TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
storIOSQLite
.get()
.cursor()
.withQuery(query)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
storIOSQLite.lowLevel().notifyAboutChanges(tableChanges);
testSubscriber.assertValueCount(2);
}
@Test
public void testDetect() {
Flowable<Integer> mockClicks = Flowable
.create(emitter -> {
emitter.onNext(1);
sleepSilently(50);
emitter.onNext(1);
sleepSilently(50);
emitter.onNext(1);
sleepSilently(50);
emitter.onNext(1);
sleepSilently(150);
emitter.onNext(1);
sleepSilently(50);
emitter.onNext(1);
sleepSilently(150);
emitter.onNext(1);
sleepSilently(150);
emitter.onNext(1);
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
RxComboDetector.detect(mockClicks, 100, 2)
.subscribeOn(Schedulers.from(command -> new Thread(command).start()))
.subscribe(subscriber);
subscriber.awaitTerminalEvent(30, TimeUnit.SECONDS);
subscriber.assertValueCount(4);
subscriber.assertValues(2, 3, 4, 2);
}
@Test
public void testTestSubscriber() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//In order to emit "1", "2", "3"
Flowable.just("1", "2", "3").subscribe(testSubscriber);
//Assert whether values are equal
testSubscriber.assertValues("1", "2", "3");
//Assert value does not exist
testSubscriber.assertNever("4");
//Is the number of asserted values equal?
testSubscriber.assertValueCount(3);
//Assertion terminated
testSubscriber.assertTerminated();
}
private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
TestScheduler scheduler = new TestScheduler();
NonBlockingConnectionPool pool = Pools //
.nonBlocking() //
.connectionProvider(DatabaseCreator.connectionProvider()) //
.maxIdleTime(10, TimeUnit.MINUTES) //
.idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES) //
.healthCheck(healthy) //
.scheduler(scheduler) //
.maxPoolSize(1) //
.build();
try (Database db = Database.from(pool)) {
TestSubscriber<Integer> ts0 = db.select( //
"select score from person where name=?") //
.parameter("FRED") //
.getAs(Integer.class) //
.test();
ts0.assertValueCount(0) //
.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts0.assertValueCount(1) //
.assertComplete();
TestSubscriber<Integer> ts = db.select( //
"select score from person where name=?") //
.parameter("FRED") //
.getAs(Integer.class) //
.test() //
.assertValueCount(0);
log.debug("done2");
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
Thread.sleep(200);
ts.assertValueCount(1);
Thread.sleep(200);
ts.assertValue(21) //
.assertComplete();
}
}
@Test
public void testSplitSimpleNormalCancelled() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(2) //
.assertValues("boo", "and").assertNotTerminated();
ts.cancel();
ts.assertValueCount(2);
ts.assertNotTerminated();
}
@Test
public void testSplitSimpleNormalCancelledEarly() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(1) //
.assertValues("boo").assertNotTerminated();
ts.cancel();
ts.assertValueCount(1);
ts.assertNotTerminated();
}
@Test
public void checkCancel() {
List<Long> requests = new CopyOnWriteArrayList<Long>();
TestSubscriber<Integer> ts = Flowable.range(1, 10) //
.doOnRequest(Consumers.addLongTo(requests)) //
.compose(Transformers.<Integer>maxRequest(3)) //
.test(4).assertValues(1, 2, 3, 4); //
ts.cancel();
ts.requestMore(3);
ts.assertValueCount(4);
ts.assertNotTerminated();
assertEquals(Arrays.asList(3L, 1L), requests);
}
@Test
public void testUpdatesUnchecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_UNCH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.<ComplexObject>observeUnsafe(key, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValue(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.<ComplexObject>observeUnsafe(key, BackpressureStrategy.MISSING)
.subscribe(new Subscriber<ComplexObject>() {
@Override
public void onComplete() {
Assert.fail("Expected nothing");
}
@Override
public void onError(Throwable e) {
Assert.fail(e.getMessage());
}
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(ComplexObject complexObject) {
Assert.fail("Expected nothing");
}
});
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
}
@Test
public void testUpdatesAllUnchecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_ALL_UNCH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.<ComplexObject>observeAllUnsafe(BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValue(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.<ComplexObject>observeAllUnsafe(BackpressureStrategy.MISSING)
.subscribe(new Subscriber<ComplexObject>() {
@Override
public void onComplete() {
Assert.fail("Expected nothing");
}
@Override
public void onError(Throwable e) {
Assert.fail(e.getMessage());
}
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(ComplexObject complexObject) {
Assert.fail("Expected nothing");
}
});
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
}
@Test
public void shouldNotReceiveNotificationIfNoChangesAfterTransactionEnd() throws InterruptedException {
final int numberOfThreads = 100;
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
final StorIOSQLite.LowLevel lowLevel = storIOSQLite.lowLevel();
lowLevel.beginTransaction();
final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// All threads should start "simultaneously".
startAllThreadsLock.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
allThreadsFinishedLock.countDown();
}
}).start();
}
// Ready!
// Steady!
startAllThreadsLock.countDown(); // Go!
assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();
// While we in transaction, no changes should be sent.
testSubscriber.assertValueCount(0);
lowLevel.endTransaction();
testSubscriber.assertNoErrors();
testSubscriber.assertNoValues();
}
@Test
public void shouldReceiveOneNotificationInTransactionWithMultipleThreads() throws InterruptedException {
final String table = "test_table";
final int numberOfThreads = 100;
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
storIOSQLite
.lowLevel()
.beginTransaction();
final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// All threads should start "simultaneously".
startAllThreadsLock.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
storIOSQLite
.lowLevel()
.notifyAboutChanges(Changes.newInstance(table));
allThreadsFinishedLock.countDown();
}
}).start();
}
// Ready!
// Steady!
startAllThreadsLock.countDown(); // Go!
assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();
// While we in transaction, no changes should be sent.
testSubscriber.assertValueCount(0);
storIOSQLite
.lowLevel()
.endTransaction();
testSubscriber.assertNoErrors();
testSubscriber.assertValues(Changes.newInstance(table));
}
private void check(@NonNull PreparedOperation operation) {
verify(storIOContentResolver).defaultRxScheduler();
TestSubscriber subscriber = new TestSubscriber();
//noinspection unchecked
operation.asRxFlowable(BackpressureStrategy.MISSING).subscribe(subscriber);
subscriber.assertNoValues();
scheduler.triggerActions();
subscriber.assertValueCount(1);
}