io.reactivex.subscribers.TestSubscriber#assertValueCount ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#assertValueCount ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: immutables   文件: GeodeCqTest.java
@Test
public void pubsub() throws Exception {

  PersonRepository repository = new PersonRepository(new GeodeBackend(GeodeSetup.of(x -> region)));

  TestSubscriber<WatchEvent<Person>> events = Flowable.fromPublisher(repository.watcher(PersonCriteria.person).watch())
          .test();

  final PersonGenerator generator = new PersonGenerator();
  final int count = 4;
  for (int i = 0; i < count; i++) {
    repository.insert(generator.next().withId("id" + i));
  }

  check(region.keySet()).notEmpty();
  // ensure (de)serialization is successful
  check(region.query("true")).hasSize(count);

  events.awaitCount(count);
  events.assertNoErrors();
  events.assertValueCount(count);
  check(events.values().stream().map(e -> e.newValue().get().id()).collect(Collectors.toList())).hasContentInAnyOrder("id0", "id1", "id2", "id3");
}
 
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
    TestService svc = new TestService();
    serverRule.getServiceRegistry().addService(svc);

    RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
    TestSubscriber<NumberProto.Number> subscription =  Single.just(Empty.getDefaultInstance())
            .as(stub::responsePressure)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"))
            .take(10)
            .test();

    // Consume some work
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    subscription.dispose();

    subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
    subscription.assertValueCount(10);
    subscription.assertTerminated();
    assertThat(svc.wasCanceled()).isTrue();

    errorRule.verifyNoError();
}
 
源代码3 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testUpdatesChecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValues(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    updatesSubscriber.assertNoErrors();
}
 
源代码4 项目: storio   文件: GetCursorObserveChangesTest.java
@Test
public void repeatsOperationWithQueryByChangeOfTag() {
    putUserBlocking();

    TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
    storIOSQLite
            .get()
            .cursor()
            .withQuery(query)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);

    storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);

    testSubscriber.assertValueCount(2);
}
 
@Test
public void test_just_Flowable() {

	Flowable<String> observable = Flowable.just("mango", "papaya", "guava");
	TestSubscriber<String> testSubscriber = new TestSubscriber<>();

	observable.subscribe(testSubscriber);

	List<String> items = testSubscriber.values();
	testSubscriber.assertComplete();
	testSubscriber.assertSubscribed();
	testSubscriber.assertNoErrors();
	testSubscriber.assertValueCount(3);
	testSubscriber.assertValues("mango", "papaya", "guava");

}
 
源代码6 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testMaxIdleTime() throws InterruptedException {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(3) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
            pool.member()) //
                    .doOnNext(m -> m.checkin()) //
                    .doOnNext(System.out::println) //
                    .doOnRequest(t -> System.out.println("test request=" + t)) //
                    .test(1);
    s.triggerActions();
    ts.assertValueCount(1);
    assertEquals(0, disposed.get());
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    assertEquals(1, disposed.get());
}
 
源代码7 项目: storio   文件: GetCursorObserveChangesTest.java
@Test
public void repeatsOperationWithRawQueryByChangeOfTable() {
    putUserBlocking();

    TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
    storIOSQLite
            .get()
            .cursor()
            .withQuery(rawQuery)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);

    storIOSQLite.lowLevel().notifyAboutChanges(tableChanges);

    testSubscriber.assertValueCount(2);
}
 
@Test
public void testFindAll() throws TechnicalException {
    // create tag
    Tag tag = new Tag();
    tag.setName("testName");
    tag.setDescription("Description");
    tag.setOrganizationId(ORGANIZATION_ID);
    tagRepository.create(tag).blockingGet();

    // fetch domains
    TestSubscriber<Tag> testObserver1 = tagRepository.findAll(ORGANIZATION_ID).test();
    testObserver1.awaitTerminalEvent();

    testObserver1.assertComplete();
    testObserver1.assertNoErrors();
    testObserver1.assertValueCount(1);
}
 
源代码9 项目: storio   文件: GetCursorObserveChangesTest.java
@Test
public void repeatsOperationWithQueryByChangeOfTable() {
    putUserBlocking();

    TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
    storIOSQLite
            .get()
            .cursor()
            .withQuery(query)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);

    storIOSQLite.lowLevel().notifyAboutChanges(tableChanges);

    testSubscriber.assertValueCount(2);
}
 
源代码10 项目: RxComboDetector   文件: RxComboDetectorTest.java
@Test
public void testDetect() {
    Flowable<Integer> mockClicks = Flowable
            .create(emitter -> {
                emitter.onNext(1);
                sleepSilently(50);
                emitter.onNext(1);
                sleepSilently(50);
                emitter.onNext(1);
                sleepSilently(50);
                emitter.onNext(1);

                sleepSilently(150);

                emitter.onNext(1);
                sleepSilently(50);
                emitter.onNext(1);

                sleepSilently(150);
                emitter.onNext(1);

                sleepSilently(150);
                emitter.onNext(1);

                emitter.onComplete();
            }, BackpressureStrategy.BUFFER);

    TestSubscriber<Integer> subscriber = new TestSubscriber<>();
    RxComboDetector.detect(mockClicks, 100, 2)
            .subscribeOn(Schedulers.from(command -> new Thread(command).start()))
            .subscribe(subscriber);

    subscriber.awaitTerminalEvent(30, TimeUnit.SECONDS);
    subscriber.assertValueCount(4);
    subscriber.assertValues(2, 3, 4, 2);
}
 
@Test
public void testTestSubscriber() {

    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    //In order to emit "1", "2", "3"
    Flowable.just("1", "2", "3").subscribe(testSubscriber);
    //Assert whether values are equal
    testSubscriber.assertValues("1", "2", "3");
    //Assert value does not exist
    testSubscriber.assertNever("4");
    //Is the number of asserted values equal?
    testSubscriber.assertValueCount(3);
    //Assertion terminated
    testSubscriber.assertTerminated();
}
 
源代码12 项目: rxjava2-jdbc   文件: DatabaseTest.java
private void testHealthCheck(Predicate<Connection> healthy) throws InterruptedException {
    TestScheduler scheduler = new TestScheduler();

    NonBlockingConnectionPool pool = Pools //
            .nonBlocking() //
            .connectionProvider(DatabaseCreator.connectionProvider()) //
            .maxIdleTime(10, TimeUnit.MINUTES) //
            .idleTimeBeforeHealthCheck(0, TimeUnit.MINUTES) //
            .healthCheck(healthy) //
            .scheduler(scheduler) //
            .maxPoolSize(1) //
            .build();

    try (Database db = Database.from(pool)) {
        TestSubscriber<Integer> ts0 = db.select( //
                "select score from person where name=?") //
                .parameter("FRED") //
                .getAs(Integer.class) //
                .test();
        ts0.assertValueCount(0) //
                .assertNotComplete();
        scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
        ts0.assertValueCount(1) //
                .assertComplete();
        TestSubscriber<Integer> ts = db.select( //
                "select score from person where name=?") //
                .parameter("FRED") //
                .getAs(Integer.class) //
                .test() //
                .assertValueCount(0);
        log.debug("done2");
        scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
        Thread.sleep(200);
        ts.assertValueCount(1);
        Thread.sleep(200);
        ts.assertValue(21) //
                .assertComplete();
    }
}
 
源代码13 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelled() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(2) //
            .assertValues("boo", "and").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(2);
    ts.assertNotTerminated();
}
 
源代码14 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelledEarly() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(1) //
            .assertValues("boo").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(1);
    ts.assertNotTerminated();
}
 
源代码15 项目: rxjava2-extras   文件: FlowableMaxRequestTest.java
@Test
public void checkCancel() {
    List<Long> requests = new CopyOnWriteArrayList<Long>();
    TestSubscriber<Integer> ts = Flowable.range(1, 10) //
            .doOnRequest(Consumers.addLongTo(requests)) //
            .compose(Transformers.<Integer>maxRequest(3)) //
            .test(4).assertValues(1, 2, 3, 4); //
    ts.cancel();
    ts.requestMore(3);
    ts.assertValueCount(4);
    ts.assertNotTerminated();
    assertEquals(Arrays.asList(3L, 1L), requests);
}
 
源代码16 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testUpdatesUnchecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_UNCH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.<ComplexObject>observeUnsafe(key, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValue(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.<ComplexObject>observeUnsafe(key, BackpressureStrategy.MISSING)
            .subscribe(new Subscriber<ComplexObject>() {
                @Override
                public void onComplete() {
                    Assert.fail("Expected nothing");
                }

                @Override
                public void onError(Throwable e) {
                    Assert.fail(e.getMessage());
                }

                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(ComplexObject complexObject) {
                    Assert.fail("Expected nothing");
                }
            });
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
}
 
源代码17 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testUpdatesAllUnchecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_ALL_UNCH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.<ComplexObject>observeAllUnsafe(BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValue(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.<ComplexObject>observeAllUnsafe(BackpressureStrategy.MISSING)
            .subscribe(new Subscriber<ComplexObject>() {
                @Override
                public void onComplete() {
                    Assert.fail("Expected nothing");
                }

                @Override
                public void onError(Throwable e) {
                    Assert.fail(e.getMessage());
                }

                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(ComplexObject complexObject) {
                    Assert.fail("Expected nothing");
                }
            });
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
}
 
源代码18 项目: storio   文件: NotifyAboutChangesTest.java
@Test
public void shouldNotReceiveNotificationIfNoChangesAfterTransactionEnd() throws InterruptedException {
    final int numberOfThreads = 100;

    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    storIOSQLite
            .observeChanges(LATEST)
            .subscribe(testSubscriber);

    final StorIOSQLite.LowLevel lowLevel = storIOSQLite.lowLevel();

    lowLevel.beginTransaction();

    final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
    final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);

    for (int i = 0; i < numberOfThreads; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // All threads should start "simultaneously".
                    startAllThreadsLock.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                allThreadsFinishedLock.countDown();
            }
        }).start();
    }

    // Ready!
    // Steady!
    startAllThreadsLock.countDown(); // Go!

    assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();

    // While we in transaction, no changes should be sent.
    testSubscriber.assertValueCount(0);

    lowLevel.endTransaction();

    testSubscriber.assertNoErrors();
    testSubscriber.assertNoValues();
}
 
源代码19 项目: storio   文件: NotifyAboutChangesTest.java
@Test
public void shouldReceiveOneNotificationInTransactionWithMultipleThreads() throws InterruptedException {
    final String table = "test_table";
    final int numberOfThreads = 100;

    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    storIOSQLite
            .observeChanges(LATEST)
            .subscribe(testSubscriber);

    storIOSQLite
            .lowLevel()
            .beginTransaction();

    final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
    final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);

    for (int i = 0; i < numberOfThreads; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // All threads should start "simultaneously".
                    startAllThreadsLock.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                storIOSQLite
                        .lowLevel()
                        .notifyAboutChanges(Changes.newInstance(table));

                allThreadsFinishedLock.countDown();
            }
        }).start();
    }

    // Ready!
    // Steady!
    startAllThreadsLock.countDown(); // Go!

    assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();

    // While we in transaction, no changes should be sent.
    testSubscriber.assertValueCount(0);

    storIOSQLite
            .lowLevel()
            .endTransaction();

    testSubscriber.assertNoErrors();
    testSubscriber.assertValues(Changes.newInstance(table));
}
 
源代码20 项目: storio   文件: SchedulerChecker.java
private void check(@NonNull PreparedOperation operation) {
    verify(storIOContentResolver).defaultRxScheduler();

    TestSubscriber subscriber = new TestSubscriber();
    //noinspection unchecked
    operation.asRxFlowable(BackpressureStrategy.MISSING).subscribe(subscriber);

    subscriber.assertNoValues();

    scheduler.triggerActions();

    subscriber.assertValueCount(1);
}