类io.reactivex.subscribers.TestSubscriber源码实例Demo

下面列出了怎么用io.reactivex.subscribers.TestSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: immutables   文件: GeodeCqTest.java
@Test
public void pubsub() throws Exception {

  PersonRepository repository = new PersonRepository(new GeodeBackend(GeodeSetup.of(x -> region)));

  TestSubscriber<WatchEvent<Person>> events = Flowable.fromPublisher(repository.watcher(PersonCriteria.person).watch())
          .test();

  final PersonGenerator generator = new PersonGenerator();
  final int count = 4;
  for (int i = 0; i < count; i++) {
    repository.insert(generator.next().withId("id" + i));
  }

  check(region.keySet()).notEmpty();
  // ensure (de)serialization is successful
  check(region.query("true")).hasSize(count);

  events.awaitCount(count);
  events.assertNoErrors();
  events.assertValueCount(count);
  check(events.values().stream().map(e -> e.newValue().get().id()).collect(Collectors.toList())).hasContentInAnyOrder("id0", "id1", "id2", "id3");
}
 
@Test
public void doNotTimeout() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));
    TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
        .take(2)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertValueCount(2)
        .assertComplete();
    then(timeLimiter).should(times(3))
        .onSuccess();
}
 
源代码3 项目: storio   文件: DefaultStorIOSQLiteTest.java
@Test
public void observeChangesAndNotifyAboutChangesShouldWorkCorrectly() {
    TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    storIOSQLite
            .observeChanges(LATEST)
            .subscribe(testSubscriber);

    testSubscriber.assertNoValues();

    Changes changes = Changes.newInstance("test_table", "tag");

    storIOSQLite
            .lowLevel()
            .notifyAboutChanges(changes);

    testSubscriber.assertValue(changes);
    testSubscriber.assertNoErrors();
    testSubscriber.dispose();
}
 
源代码4 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testWithScheduler() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertNotComplete();
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2);
    ts.assertNotComplete();
    // next wait is 2 seconds so advancing by 1 should do nothing
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2);
    ts.assertNotComplete();
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2, 1, 2);
    ts.assertError(ex);
}
 
源代码5 项目: storio   文件: ChangesFilterTest.java
@Test
public void applyForTablesAndTags_shouldNotifyByTable() {
    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    ChangesFilter
            .applyForTablesAndTags(
                    Flowable.just(Changes.newInstance("table1", "another_tag"),
                            Changes.newInstance("table2", "tag2"),
                            Changes.newInstance("table3")),
                    singleton("table1"),
                    singleton("tag1"))
            .subscribe(testSubscriber);

    // All other Changes should be filtered
    testSubscriber.assertValue(Changes.newInstance("table1", "another_tag"));

    testSubscriber.dispose();
}
 
源代码6 项目: rxjava2-extras   文件: TransformersTest.java
@Test
public void testInsertWithDelays() {
    TestScheduler s = new TestScheduler();
    TestSubscriber<Integer> ts = //
            Flowable.just(1).delay(1, TimeUnit.SECONDS, s) //
                    .concatWith(Flowable.just(2).delay(3, TimeUnit.SECONDS, s)) //
                    .compose(Transformers.insert(Maybe.just(3).delay(2, TimeUnit.SECONDS, s))) //
                    .test();
    ts.assertNoValues();
    s.advanceTimeBy(1, TimeUnit.SECONDS);
    ts.assertValues(1);
    s.advanceTimeBy(2, TimeUnit.SECONDS);
    ts.assertValues(1, 3);
    s.advanceTimeBy(1, TimeUnit.SECONDS);
    ts.assertValues(1, 3, 2);
    ts.assertComplete();
}
 
源代码7 项目: storio   文件: ChangesFilterTest.java
@Test
public void applyForTables_shouldFilterRequiredTable() {
    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    ChangesFilter
            .applyForTables(
                    Flowable.just(Changes.newInstance("table1"),
                            Changes.newInstance("table2"),
                            Changes.newInstance("table3")),
                    singleton("table2"))
            .subscribe(testSubscriber);

    // All other tables should be filtered
    testSubscriber.assertValue(Changes.newInstance("table2"));

    testSubscriber.dispose();
}
 
@Test
public void onAppOpen_whenAnalyticsAbsent_notifiesSubscriber() {
  TestUniversalComponent analyticsLessUniversalComponent =
      universalComponentBuilder
          .appMeasurementModule(new AppMeasurementModule(null, firebaseEventSubscriber))
          .build();
  TestAppComponent appComponent =
      appComponentBuilder.universalComponent(analyticsLessUniversalComponent).build();
  FirebaseInAppMessaging instance = appComponent.providesFirebaseInAppMessaging();
  TestSubscriber<InAppMessage> subscriber = listenerToFlowable(instance);

  simulateAppResume();
  waitUntilNotified(subscriber);

  assertSingleSuccessNotification(subscriber);
}
 
源代码9 项目: storio   文件: GetCursorObserveChangesTest.java
@Test
public void repeatsOperationWithRawQueryByChangeOfTag() {
    putUserBlocking();

    TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
    storIOSQLite
            .get()
            .cursor()
            .withQuery(rawQuery)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);

    storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);

    testSubscriber.assertValueCount(2);
}
 
源代码10 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testMaxIdleTime() throws InterruptedException {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(3) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
            pool.member()) //
                    .doOnNext(m -> m.checkin()) //
                    .doOnNext(System.out::println) //
                    .doOnRequest(t -> System.out.println("test request=" + t)) //
                    .test(1);
    s.triggerActions();
    ts.assertValueCount(1);
    assertEquals(0, disposed.get());
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    assertEquals(1, disposed.get());
}
 
源代码11 项目: resilience4j   文件: CacheTest.java
@Test
public void shouldReturnValueOfSupplier() throws Throwable {
    given(cache.get("testKey")).willReturn(null);
    willThrow(new RuntimeException("Cache is not available")).given(cache)
        .put("testKey", "Hello world");
    Cache<String, String> cacheContext = Cache.of(cache);
    TestSubscriber<CacheEvent.Type> testSubscriber =
        toFlowable(cacheContext.getEventPublisher())
            .map(CacheEvent::getEventType)
            .test();
    CheckedFunction1<String, String> cachedFunction = Cache
        .decorateCheckedSupplier(cacheContext, () -> "Hello world");

    String value = cachedFunction.apply("testKey");

    assertThat(value).isEqualTo("Hello world");
    assertThat(cacheContext.getMetrics().getNumberOfCacheHits()).isEqualTo(0);
    assertThat(cacheContext.getMetrics().getNumberOfCacheMisses()).isEqualTo(1);
    testSubscriber
        .assertValueCount(2)
        .assertValues(CacheEvent.Type.CACHE_MISS, CacheEvent.Type.ERROR);
}
 
@Test
public void testFindAll() throws TechnicalException {
    // create tag
    Tag tag = new Tag();
    tag.setName("testName");
    tag.setDescription("Description");
    tag.setOrganizationId(ORGANIZATION_ID);
    tagRepository.create(tag).blockingGet();

    // fetch domains
    TestSubscriber<Tag> testObserver1 = tagRepository.findAll(ORGANIZATION_ID).test();
    testObserver1.awaitTerminalEvent();

    testObserver1.assertComplete();
    testObserver1.assertNoErrors();
    testObserver1.assertValueCount(1);
}
 
源代码13 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testConnectionPoolRecylesAlternating() {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(2) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .scheduler(s) //
            .build();
    TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
            .repeat() //
            .doOnNext(m -> m.checkin()) //
            .map(m -> m.value()) //
            .test(4); //
    s.triggerActions();
    ts.assertValueCount(4) //
            .assertNotTerminated();
    List<Object> list = ts.getEvents().get(0);
    // all 4 connections released were the same
    assertTrue(list.get(0) == list.get(1));
    assertTrue(list.get(1) == list.get(2));
    assertTrue(list.get(2) == list.get(3));
}
 
源代码14 项目: storio   文件: RxQueryTest.java
@Test
public void queryOneExistedObjectFlowable() {
    final List<User> users = putUsersBlocking(3);
    final User expectedUser = users.get(0);

    final Flowable<Optional<User>> userFlowable = storIOSQLite
            .get()
            .object(User.class)
            .withQuery(Query.builder()
                    .table(UserTableMeta.TABLE)
                    .where(UserTableMeta.COLUMN_EMAIL + "=?")
                    .whereArgs(expectedUser.email())
                    .build())
            .prepare()
            .asRxFlowable(LATEST)
            .take(1);

    TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
    userFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();
    testSubscriber.assertValue(Optional.of(expectedUser));
}
 
源代码15 项目: storio   文件: ChangesFilterTest.java
@Test
public void applyForTablesAndTags_shouldSendJustOnceNotificationIfBothTableAndTagAreSatisfy() {
    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    ChangesFilter
            .applyForTablesAndTags(
                    Flowable.just(Changes.newInstance("target_table", "target_tag")),
                    singleton("target_table"),
                    singleton("target_tag"))
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);
    testSubscriber.assertValue(Changes.newInstance("target_table", "target_tag"));

    testSubscriber.dispose();
}
 
@Test
public void storeImpression_writeErrors_doesNotSetInMemoryCache() {
  CampaignImpressionList otherCampaignImpressionList =
      CampaignImpressionList.getDefaultInstance();
  when(storageClient.write(any(CampaignImpressionList.class)))
      .thenReturn(Completable.error(new IOException()));
  when(storageClient.read(any(CampaignImpressionsParser.class))).thenReturn(Maybe.empty());
  impressionStorageClient.storeImpression(campaignImpression).subscribe();
  when(storageClient.read(any(CampaignImpressionsParser.class)))
      .thenReturn(Maybe.just(otherCampaignImpressionList));

  TestSubscriber<CampaignImpressionList> subscriber =
      impressionStorageClient.getAllImpressions().toFlowable().test();

  assertThat(getPlainValues(subscriber)).containsExactly(otherCampaignImpressionList);
}
 
源代码17 项目: RxShell   文件: CmdProcessorTest.java
@Test
public void testCommand_callback_async() {
    processor.attach(session);

    int cnt = 100;
    List<EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>();
    for (int j = 0; j < cnt; j++) {
        List<String> cmds = new ArrayList<>();
        for (int i = 0; i < 10; i++) cmds.add("echo " + i);
        cmds.add("echo " + j);

        PublishProcessor<String> outputListener = PublishProcessor.create();
        TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test();
        final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build();
        final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test();
        testSubscribers.add(new EnvVar<>(resultObserver, outputObserver));
    }
    for (EnvVar<TestObserver<Cmd.Result>, TestSubscriber<String>> envVar : testSubscribers) {
        envVar.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete();
        envVar.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11);
    }
}
 
源代码18 项目: reactive-grpc   文件: BackpressureChunkingTest.java
@Test
public void chunkOperatorCorrectlyChunksInfiniteRequest() {
    int chunkSize = DEFAULT_CHUNK_SIZE;

    int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
    int num = chunkSize * 2;

    AbstractStreamObserverAndPublisher<Long> source =
            new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
    AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
    source.onSubscribe(observer);
    TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
                                                  .test();


    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertComplete();

    assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
    assertThat(source.outputFused).isFalse();
}
 
源代码19 项目: wurmloch-crdt   文件: ORSetTest.java
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
    // given:
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final ORSet<String> set = new ORSet<>("ID_1");
    set.subscribe(subscriber);

    set.add("1");
    set.add("1");

    // when:
    final Iterator<String> it = set.iterator();
    it.next();
    it.remove();

    // then:
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
    assertThat(subscriber.values(), contains(
            new AddCommandMatcher<>(set.getCrdtId(), "1"),
            new AddCommandMatcher<>(set.getCrdtId(), "1"),
            new RemoveCommandMatcher<>(set.getCrdtId(), "1", "1")
    ));
}
 
源代码20 项目: storio   文件: GetCursorObserveChangesTest.java
@Test
public void repeatsOperationWithQueryByChangeOfTag() {
    putUserBlocking();

    TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
    storIOSQLite
            .get()
            .cursor()
            .withQuery(query)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValueCount(1);

    storIOSQLite.lowLevel().notifyAboutChanges(tagChanges);

    testSubscriber.assertValueCount(2);
}
 
@Test
public void clearImpressions_doesNotClearImpressionForUnfetchedCampaign() {
  // verify initial campaign is impressed.
  TestSubscriber<Boolean> subscriber =
      impressionStorageClient.isImpressed(vanillaCampaign).toFlowable().test();
  assertThat(subscriber.getEvents().get(0)).containsExactly(true);

  // clear impressions for a fetch response containing a different campaign.
  // This simulates having received the campaign again from the server.
  impressionStorageClient
      .clearImpressions(
          FetchEligibleCampaignsResponse.newBuilder().addMessages(experimentalCampaign).build())
      .subscribe();

  // Verify campaign is still impressed.
  TestSubscriber<Boolean> subscriber2 =
      impressionStorageClient.isImpressed(vanillaCampaign).toFlowable().test();
  assertThat(subscriber2.getEvents().get(0)).containsExactly(true);
}
 
源代码22 项目: reactive-grpc   文件: GrpcRetryTest.java
@Test
public void manyToManyRetryImmediately() {
    TestSubscriber<Integer> test = newThreeErrorFlowable()
            .compose(GrpcRetry.ManyToMany.retryImmediately(new Function<Flowable<Integer>, Flowable<Integer>>() {
                @Override
                public Flowable<Integer> apply(Flowable<Integer> flowable) {
                    return flowable;
                }
            }))
            .test();

    test.awaitTerminalEvent(1, TimeUnit.SECONDS);
    test.assertValues(0);
    test.assertNoErrors();
    test.assertComplete();
}
 
@Test
public void timeoutAfterInitial() throws InterruptedException {
    int timeout = 2;
    int initialDelay = 1;
    int periodDelay = 3;
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofSeconds(timeout)));
    TestSubscriber<Long> subscriber = Flowable
        .interval(initialDelay, periodDelay, TimeUnit.SECONDS)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.await()
        .assertValueCount(1)
        .assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onSuccess();
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
源代码24 项目: cxf   文件: JAXRSRxJava2FlowableTest.java
@Test
public void testGetHelloWorldJson() throws Exception {
    String address = "http://localhost:" + PORT + "/rx2/flowable/textJson";
    List<Object> providers = new LinkedList<>();
    providers.add(new JacksonJsonProvider());
    providers.add(new FlowableRxInvokerProvider());
    WebClient wc = WebClient.create(address, providers);
    Flowable<HelloWorldBean> obs = wc.accept("application/json")
        .rx(FlowableRxInvoker.class)
        .get(HelloWorldBean.class);

    final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
    obs.subscribe(subscriber);

    subscriber.await(3, TimeUnit.SECONDS);
    subscriber.assertResult(new HelloWorldBean("Hello", "World"));
}
 
源代码25 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
@Test
public void statusExceptionTriggersHandler() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    }) { };

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .test();

    StatusException exception = Status.CANCELLED.asException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isFalse();
}
 
源代码27 项目: wurmloch-crdt   文件: ORSetTest.java
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
    // given:
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final ORSet<String> set = new ORSet<>("ID_1");
    set.subscribe(subscriber);

    // when:
    set.add("1");
    set.add("2");
    set.add("1");

    // then:
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
    assertThat(subscriber.values(), contains(
            new AddCommandMatcher<>(set.getCrdtId(), "1"),
            new AddCommandMatcher<>(set.getCrdtId(), "2"),
            new AddCommandMatcher<>(set.getCrdtId(), "1")
    ));
}
 
源代码28 项目: wurmloch-crdt   文件: GSetTest.java
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
    // given:
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final GSet<String> set = new GSet<>("ID_1");
    set.subscribe(subscriber);

    // when:
    set.add("1");
    set.add("2");
    set.add("1");

    // then:
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
    assertThat(subscriber.values(), contains(
            new AddCommandMatcher<>(set.getCrdtId(), "1"),
            new AddCommandMatcher<>(set.getCrdtId(), "2"),
            new AddCommandMatcher<>(set.getCrdtId(), "1")
    ));
}
 
源代码29 项目: RxShell   文件: RxShellTest.java
@Test
public void testProcessCompletion_linereaders_dont_terminate_early() throws IOException {
    RxShell rxShell = new RxShell(rxProcess);
    RxShell.Session session = rxShell.open().test().awaitCount(1).assertNoTimeout().values().get(0);
    TestSubscriber<String> outputObs = session.outputLines().test();
    TestSubscriber<String> errorObs = session.errorLines().test();

    session.close().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(0);

    await().atMost(1, TimeUnit.SECONDS).until(() -> cmdStream.isOpen(), is(false));

    await().pollDelay(1, TimeUnit.SECONDS).until(outputObs::isTerminated, is(false));
    await().pollDelay(1, TimeUnit.SECONDS).until(errorObs::isTerminated, is(false));

    await().atMost(1, TimeUnit.SECONDS).until(() -> outputStream.isOpen(), is(true));
    await().atMost(1, TimeUnit.SECONDS).until(() -> errorStream.isOpen(), is(true));

    outputStream.close();
    errorStream.close();

    await().atMost(1, TimeUnit.SECONDS).until(() -> outputStream.isOpen(), is(false));
    await().atMost(1, TimeUnit.SECONDS).until(() -> errorStream.isOpen(), is(false));

    await().atMost(1, TimeUnit.SECONDS).until(outputObs::isTerminated, is(true));
    await().atMost(1, TimeUnit.SECONDS).until(errorObs::isTerminated, is(true));
}
 
源代码30 项目: rxSuggestions   文件: GoogleSuggestionSourceTest.java
@Test
public void getSuggestions_ValuesNotEmpty() {
    Flowable<SimpleSuggestionItem> suggestions = suggestionSource.getSuggestions("Hello");

    if (Util.isOnline()) {
        final TestSubscriber<SimpleSuggestionItem> suggestionItemTestSubscriber = suggestions.test();
        suggestionItemTestSubscriber
                .assertNoErrors()
                .assertComplete();

        final List<List<Object>> events = suggestionItemTestSubscriber.getEvents();
        final List<Object> onNextEvents = events.get(0);
        for (Object suggestionItem : onNextEvents) {
            Assert.assertTrue(suggestionItem instanceof SimpleSuggestionItem);
            Assert.assertNotNull(((SimpleSuggestionItem) suggestionItem).value());
            Assert.assertTrue(!TextUtils.isEmpty(((SimpleSuggestionItem) suggestionItem).value()));
        }
    } else {
        testFailureNoEvents(suggestions);
    }
}
 
 类所在包
 同包方法