类io.reactivex.schedulers.TestScheduler源码实例Demo

下面列出了怎么用io.reactivex.schedulers.TestScheduler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-lab   文件: Code04.java
public static void main(String[] args) throws Throwable {
    TestScheduler scheduler = new TestScheduler();

    Single<Long> s1 = Single.timer(1, TimeUnit.SECONDS, scheduler);
    Single<String> s2 = Single.just("Hello");
    Single<String> r = Single.zip(s1, s2, (t, s) -> t + " -> " + s);

    TestObserver<String> testObserver = r.test();

    testObserver.assertNoValues();

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
    testObserver.assertNoValues();

    scheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
    testObserver
            .assertNoErrors()
            .assertValue("0 -> Hello");
}
 
@Before
public void before() {
    initStubs();

    counter = new AtomicInteger(0);
    scheduler = new TestScheduler();

    when(apiBuilder.build())
            .thenReturn(
                    Flowable.defer(() -> buildMockedApiWithErrorChatLoading(testNameRule.getMethodName())).toObservable()
            );
    when(keyGenerator.getKeyPairFromPassPhrase(TestConstants.TEST_PASS_PHRASE))
            .thenReturn(new KeyPair(TestConstants.TEST_PUBLIC_KEY, TestConstants.TEST_SECRET_KEY));

    wrapper = new AdamantApiWrapper(apiBuilder, keyGenerator, scheduler);

    Disposable subscribe = wrapper
            .authorize(TestConstants.TEST_PASS_PHRASE)
            .subscribe();
    subscritions.add(subscribe);

    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
}
 
源代码3 项目: reactive-streams-in-java   文件: RxJavaDemoTest.java
@Test
public void testScheduler() {
    TestScheduler scheduler = new TestScheduler(); //1
    Observable<Long> tick = Observable
            .interval(1, TimeUnit.SECONDS, scheduler); //2
    Observable<String> observable =
            Observable.just("foo", "bar", "biz", "baz") //3
            .zipWith(tick, (string, index) -> index + "-" + string);//4
    TestObserver<String> testObserver = observable
            .subscribeOn(scheduler).test();//5

    scheduler.advanceTimeBy(2300, TimeUnit.MILLISECONDS);//6

    testObserver.assertNoErrors(); //7
    testObserver.assertValues("0-foo", "1-bar");
    testObserver.assertNotComplete();
}
 
@Test
public void test_interval()
{
	TestScheduler testScheduler=new TestScheduler();
	Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5);
	TestObserver<Long> testObserver=new TestObserver<>();

	
	observable.subscribeOn(testScheduler).subscribe(testObserver);
	
	testObserver.assertNoValues();
	testObserver.assertNotComplete();
	testObserver.assertNoErrors();
	
	testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
	testObserver.assertValueCount(1);
	testObserver.assertValues(0l);
	
	testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
	testObserver.assertValueCount(5);
	testObserver.assertValues(0l,1l,2l,3l,4l);
}
 
源代码5 项目: AndroidGodEye   文件: ThreadDumpTest.java
@Test
public void work2() {
    ((TestScheduler) ThreadUtil.computationScheduler()).advanceTimeBy(5, TimeUnit.SECONDS);
    try {
        TestObserver<List<ThreadInfo>> subscriber = new TestObserver<>();
        GodEye.instance().<ThreadDump, List<ThreadInfo>>moduleObservable(GodEye.ModuleName.THREAD).subscribe(subscriber);
        subscriber.assertValue(new Predicate<List<ThreadInfo>>() {
            @Override
            public boolean test(List<ThreadInfo> threadInfos) throws Exception {
                return threadInfos != null && !threadInfos.isEmpty();
            }
        });
    } catch (UninstallException e) {
        Assert.fail();
    }
}
 
源代码6 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testMaxIdleTime() throws InterruptedException {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(3) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
            pool.member()) //
                    .doOnNext(m -> m.checkin()) //
                    .doOnNext(System.out::println) //
                    .doOnRequest(t -> System.out.println("test request=" + t)) //
                    .test(1);
    s.triggerActions();
    ts.assertValueCount(1);
    assertEquals(0, disposed.get());
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    assertEquals(1, disposed.get());
}
 
源代码7 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testConnectionPoolRecylesAlternating() {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(2) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .scheduler(s) //
            .build();
    TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
            .repeat() //
            .doOnNext(m -> m.checkin()) //
            .map(m -> m.value()) //
            .test(4); //
    s.triggerActions();
    ts.assertValueCount(4) //
            .assertNotTerminated();
    List<Object> list = ts.getEvents().get(0);
    // all 4 connections released were the same
    assertTrue(list.get(0) == list.get(1));
    assertTrue(list.get(1) == list.get(2));
    assertTrue(list.get(2) == list.get(3));
}
 
源代码8 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testMemberAvailableAfterCreationScheduledIsUsedImmediately() throws InterruptedException {
    TestScheduler ts = new TestScheduler();
    Scheduler s = createScheduleToDelayCreation(ts);
    AtomicInteger count = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .createRetryInterval(10, TimeUnit.MINUTES) //
            .maxSize(2) //
            .maxIdleTime(1, TimeUnit.HOURS) //
            .scheduler(s) //
            .build();
    List<Member<Integer>> list = new ArrayList<Member<Integer>>();
    pool.member().doOnSuccess(m -> list.add(m)).subscribe();
    assertEquals(0, list.size());
    ts.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.triggerActions();
    assertEquals(1, list.size());
    pool.member().doOnSuccess(m -> list.add(m)).subscribe();
    list.get(0).checkin();
    ts.triggerActions();
    assertEquals(2, list.size());
}
 
源代码9 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testSubscribeWhenPoolClosedEmitsError() throws Exception {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(3) //
            .maxIdleTime(1, TimeUnit.MINUTES) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    pool.close();
    new FlowableSingleDeferUntilRequest<>( //
            pool.member()) //
                    .test(1) //
                    .assertError(PoolClosedException.class) //
                    .assertNoValues();
}
 
源代码10 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testWithScheduler() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertNotComplete();
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2);
    ts.assertNotComplete();
    // next wait is 2 seconds so advancing by 1 should do nothing
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2);
    ts.assertNotComplete();
    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
    ts.assertValues(1, 2, 1, 2, 1, 2);
    ts.assertError(ex);
}
 
源代码11 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFails() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
源代码12 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
源代码13 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionAllowed() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryWhenInstanceOf(IllegalArgumentException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertNotComplete();
}
 
源代码14 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Predicate<Throwable> predicate = new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable t) {
            return t instanceof IllegalArgumentException;
        }
    };
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryIf(predicate).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertNotComplete();
}
 
源代码15 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsFalse() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Predicate<Throwable> predicate = Predicates.alwaysFalse();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryIf(predicate).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
源代码16 项目: rxjava2-extras   文件: TransformersTest.java
@Test
public void testInsertWithDelays() {
    TestScheduler s = new TestScheduler();
    TestSubscriber<Integer> ts = //
            Flowable.just(1).delay(1, TimeUnit.SECONDS, s) //
                    .concatWith(Flowable.just(2).delay(3, TimeUnit.SECONDS, s)) //
                    .compose(Transformers.insert(Maybe.just(3).delay(2, TimeUnit.SECONDS, s))) //
                    .test();
    ts.assertNoValues();
    s.advanceTimeBy(1, TimeUnit.SECONDS);
    ts.assertValues(1);
    s.advanceTimeBy(2, TimeUnit.SECONDS);
    ts.assertValues(1, 3);
    s.advanceTimeBy(1, TimeUnit.SECONDS);
    ts.assertValues(1, 3, 2);
    ts.assertComplete();
}
 
源代码17 项目: akarnokd-misc   文件: ThrottleSampleTest.java
@Test
public void test() {
    TestScheduler tsch = new TestScheduler();

    Flowable.fromArray(
            100,                // should emit 100 at T=100
            110, 120, 130, 150, // should emit 150 at T=200 
            250, 260,           // should emit 260 at T=300
            400                 // should emit 400 at T=400
    )
    .flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
    .compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
    .subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
    
    tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
 
源代码18 项目: akarnokd-misc   文件: ThrottleSampleTest.java
@Test
public void testObservable() {
    TestScheduler tsch = new TestScheduler();

    Observable.fromArray(
            100,                // should emit 100 at T=100
            110, 120, 130, 150, // should emit 150 at T=200 
            250, 260,           // should emit 260 at T=300
            400                 // should emit 400 at T=400
    )
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
    .compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
    .subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
    
    tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
 
源代码19 项目: akarnokd-misc   文件: SingleConcatTest.java
@Test
public void test() {
  TestScheduler testScheduler = new TestScheduler();

  final Single<List<Integer>> first = Single.timer(2, TimeUnit.SECONDS, testScheduler)
          .map(u -> Arrays.asList(1, 2, 3));
  final Single<List<Integer>> second = Single.just(Collections.emptyList());
  final Single<List<Integer>> third = Single.just(Collections.singletonList(4));
  final Single<List<Integer>> fourth = Single.just(Collections.singletonList(5));

  Single<List<Integer>> subject = Observable
    .fromIterable(Arrays.asList(first, second, third, fourth))
    .concatMapSingle(single -> single)
    .reduce(new ArrayList<>(), (seed, items) -> {
      seed.addAll(items);
      return seed;
    });

    TestObserver<List<Integer>> testObserver = subject.test();
    testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

    System.out.println(testObserver.values());
    testObserver.assertValue(list -> list.equals(Arrays.asList(1, 2, 3, 4, 5))); 
    // 5 is currently missing ; fourth was never subscribed in the first place
}
 
源代码20 项目: tutorials   文件: RxFlatmapAndSwitchmapUnitTest.java
@Test
public void givenObservable_whenFlatmap_shouldAssertAllItemsReturned() {
    //given
    List<String> actualOutput = new ArrayList<>();
    final TestScheduler scheduler = new TestScheduler();
    final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");

    //when
    Observable.fromIterable(keywordToSearch)
      .flatMap(s -> Observable
        .just(s + " FirstResult", s + " SecondResult")
        .delay(10, TimeUnit.SECONDS, scheduler))
      .toList()
      .doOnSuccess(s -> actualOutput.addAll(s))
      .subscribe();

    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    //then
    assertThat(actualOutput, hasItems("b FirstResult", "b SecondResult",
      "boo FirstResult", "boo SecondResult",
      "bo FirstResult", "bo SecondResult",
      "book FirstResult", "book SecondResult",
      "books FirstResult", "books SecondResult"));
}
 
源代码21 项目: tutorials   文件: RxFlatmapAndSwitchmapUnitTest.java
@Test
public void givenObservable_whenSwitchmap_shouldAssertLatestItemReturned() {
    //given
    List<String> actualOutput = new ArrayList<>();
    final TestScheduler scheduler = new TestScheduler();
    final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");

    //when
    Observable.fromIterable(keywordToSearch)
      .switchMap(s -> Observable
        .just(s + " FirstResult", s + " SecondResult")
        .delay(10, TimeUnit.SECONDS, scheduler))
      .toList()
      .doOnSuccess(s -> actualOutput.addAll(s))
      .subscribe();

    scheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    //then
    assertEquals(2, actualOutput.size());
    assertThat(actualOutput, hasItems("books FirstResult", "books SecondResult"));
}
 
源代码22 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(0));
  scheduler.triggerActions();
  assertThat(action.getRunCount(), is(1));
}
 
源代码23 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
    throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is(equalTo(null)));
  scheduler.triggerActions();
  assertThat(consumer.getCurrentValue(), is("First Time"));
}
 
源代码24 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function = s -> s.length();
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertValue(5);
}
 
源代码25 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function =
      s -> {
        throw new RuntimeException("Something bad happened");
      };
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertError(RuntimeException.class);
}
 
@Before
public void setUp() throws Exception {
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    mTestScheduler = new TestScheduler();
    TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider(mTestScheduler);
    mLoginPresenter = new LoginPresenter<>(
            mMockLoginMvpInteractor,
            testSchedulerProvider,
            compositeDisposable);
    mLoginPresenter.onAttach(mMockLoginMvpView);
}
 
@Before
public void setUp() throws Exception {
    mTestScheduler = new TestScheduler();
    TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider(mTestScheduler);
    mLoginViewModel = new LoginViewModel(mMockDataManager, testSchedulerProvider);
    mLoginViewModel.setNavigator(mLoginCallback);
}
 
源代码28 项目: ocraft-s2client   文件: ResponseQueueTest.java
@Test
void pollsResponse() {
    ResponseQueue responseQueue = new ResponseQueue();

    TestScheduler scheduler = new TestScheduler();
    Observable<Response> observable = responseStream(scheduler);
    MaybeSubject<Response> maybeSubject = MaybeSubject.create();
    observable.firstElement().subscribe(maybeSubject);

    responseQueue.offer(ResponseType.PING, maybeSubject);

    assertThat(responseQueue.peekResponse(ResponseType.PING)).isFalse();
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    assertThat(responseQueue.peekResponse(ResponseType.PING)).isTrue();
}
 
源代码29 项目: AndroidGodEye   文件: ThreadHelper.java
public static void setupRxjava() {
    TestScheduler testScheduler = new TestScheduler();
    RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
        @Override
        public Scheduler apply(Scheduler scheduler) throws Exception {
            return testScheduler;
        }
    });
}
 
源代码30 项目: AndroidGodEye   文件: TrafficTest.java
@Test
public void work2() {
    ((TestScheduler) ThreadUtil.computationScheduler()).advanceTimeBy(5, TimeUnit.SECONDS);
    try {
        List<TrafficInfo> trafficInfos = GodEye.instance().<Traffic, TrafficInfo>moduleObservable(GodEye.ModuleName.TRAFFIC).test().values();
        Assert.assertTrue(trafficInfos != null && !trafficInfos.isEmpty());
    } catch (UninstallException e) {
        Assert.fail();
    }
}
 
 类所在包
 同包方法