类io.reactivex.rxjava3.schedulers.TestScheduler源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.schedulers.TestScheduler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void testSizeAndHasAnyValueTimeBounded() {
    TestScheduler ts = new TestScheduler();
    ReplayRelay<Object> rs = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, ts);

    assertEquals(0, rs.size());
    assertFalse(rs.hasValue());

    for (int i = 0; i < 1000; i++) {
        rs.accept(i);
        assertEquals(1, rs.size());
        assertTrue(rs.hasValue());
        ts.advanceTimeBy(2, TimeUnit.SECONDS);
        assertEquals(0, rs.size());
        assertFalse(rs.hasValue());
    }
}
 
源代码2 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void takeSizeAndTime() {
    TestScheduler scheduler = new TestScheduler();

    ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rp.accept(1);
    rp.accept(2);
    rp.accept(3);

    rp
    .take(1)
    .test()
    .assertResult(2);
}
 
源代码3 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void reentrantDrain() {
    TestScheduler scheduler = new TestScheduler();

    final ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    TestObserver<Integer> ts = new TestObserver<Integer>() {
        @Override
        public void onNext(Integer t) {
            if (t == 1) {
                rp.accept(2);
            }
            super.onNext(t);
        }
    };

    rp.subscribe(ts);

    rp.accept(1);

    ts.assertValues(1, 2);
}
 
源代码4 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
    TestScheduler scheduler = new TestScheduler();
    ReplayRelay<Integer> rs = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rs.accept(1);
    rs.accept(2);
    rs.accept(3); // remove 1 due to maxSize, size == 2

    scheduler.advanceTimeBy(2, TimeUnit.SECONDS);

    rs.accept(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
    rs.accept(5); // size == 2

    rs.test().assertValuesOnly(4, 5);
}
 
源代码5 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void testSizeAndHasAnyValueTimeBounded() {
    TestScheduler ts = new TestScheduler();
    ReplayRelay<Object> rs = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, ts);

    assertEquals(0, rs.size());
    assertFalse(rs.hasValue());

    for (int i = 0; i < 1000; i++) {
        rs.accept(i);
        assertEquals(1, rs.size());
        assertTrue(rs.hasValue());
        ts.advanceTimeBy(2, TimeUnit.SECONDS);
        assertEquals(0, rs.size());
        assertFalse(rs.hasValue());
    }
}
 
源代码6 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void takeSizeAndTime() {
    TestScheduler scheduler = new TestScheduler();

    ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rp.accept(1);
    rp.accept(2);
    rp.accept(3);

    rp
    .take(1)
    .test()
    .assertResult(2);
}
 
源代码7 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void reentrantDrain() {
    TestScheduler scheduler = new TestScheduler();

    final ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    TestObserver<Integer> ts = new TestObserver<Integer>() {
        @Override
        public void onNext(Integer t) {
            if (t == 1) {
                rp.accept(2);
            }
            super.onNext(t);
        }
    };

    rp.subscribe(ts);

    rp.accept(1);

    ts.assertValues(1, 2);
}
 
源代码8 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
    TestScheduler scheduler = new TestScheduler();
    ReplayRelay<Integer> rs = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rs.accept(1);
    rs.accept(2);
    rs.accept(3); // remove 1 due to maxSize, size == 2

    scheduler.advanceTimeBy(2, TimeUnit.SECONDS);

    rs.accept(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
    rs.accept(5); // size == 2

    rs.test().assertValuesOnly(4, 5);
}
 
源代码9 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(0));
  scheduler.triggerActions();
  assertThat(action.getRunCount(), is(1));
}
 
源代码10 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
    throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is(equalTo(null)));
  scheduler.triggerActions();
  assertThat(consumer.getCurrentValue(), is("First Time"));
}
 
源代码11 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function = String::length;
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertValue(5);
}
 
源代码12 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function =
      s -> {
        throw new RuntimeException("Something bad happened");
      };
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertError(RuntimeException.class);
}
 
源代码13 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void timedSkipOld() {
    TestScheduler scheduler = new TestScheduler();

    ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rp.accept(1);
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    rp.test()
    .assertEmpty();
}
 
源代码14 项目: apollo-android   文件: Rx3ApolloTest.java
@Test
public void prefetchIsCanceledWhenDisposed() throws Exception {
  server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));

  TestObserver<EpisodeHeroNameQuery.Data> testObserver = new TestObserver<>();
  Disposable disposable = Rx3Apollo
      .from(apolloClient.prefetch(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
      .observeOn(new TestScheduler())
      .subscribeWith(testObserver);

  disposable.dispose();

  testObserver.assertNotComplete();
  assertThat(testObserver.isDisposed()).isTrue();
}
 
源代码15 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void timedSkipOld() {
    TestScheduler scheduler = new TestScheduler();

    ReplayRelay<Integer>
        rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);

    rp.accept(1);
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    rp.test()
    .assertEmpty();
}
 
源代码16 项目: resilience4j   文件: TestSchedulerRule.java
public TestScheduler getTestScheduler() {
    return testScheduler;
}
 
@Before
public void setup() {
    lifecycle = PublishSubject.create();
    testScheduler = new TestScheduler();
}
 
@Before
public void setup() {
    lifecycle = PublishSubject.create();
    testScheduler = new TestScheduler();
}
 
@Before
public void setup() {
    lifecycle = PublishSubject.create();
    testScheduler = new TestScheduler();
}
 
源代码20 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void testReplayTimestampedDirectly() {
    TestScheduler scheduler = new TestScheduler();
    ReplayRelay<Integer> source = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, scheduler);

    source.accept(1);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    Observer<Integer> o = TestHelper.mockObserver();

    source.subscribe(o);

    source.accept(2);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    source.accept(3);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    //source.onComplete();

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    verify(o, never()).onError(any(Throwable.class));
    verify(o, never()).onNext(1);
    verify(o).onNext(2);
    verify(o).onNext(3);
    //verify(o).onComplete();
}
 
源代码21 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void testReplayTimestampedDirectly() {
    TestScheduler scheduler = new TestScheduler();
    ReplayRelay<Integer> source = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, scheduler);

    source.accept(1);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    Observer<Integer> o = TestHelper.mockObserver();

    source.subscribe(o);

    source.accept(2);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    source.accept(3);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    //source.onComplete();

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    verify(o, never()).onError(any(Throwable.class));
    verify(o, never()).onNext(1);
    verify(o).onNext(2);
    verify(o).onNext(3);
    //verify(o).onComplete();
}
 
 类所在包
 同包方法