下面列出了怎么用io.reactivex.schedulers.TestScheduler的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Throwable {
TestScheduler scheduler = new TestScheduler();
Single<Long> s1 = Single.timer(1, TimeUnit.SECONDS, scheduler);
Single<String> s2 = Single.just("Hello");
Single<String> r = Single.zip(s1, s2, (t, s) -> t + " -> " + s);
TestObserver<String> testObserver = r.test();
testObserver.assertNoValues();
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
scheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
testObserver
.assertNoErrors()
.assertValue("0 -> Hello");
}
@Before
public void before() {
initStubs();
counter = new AtomicInteger(0);
scheduler = new TestScheduler();
when(apiBuilder.build())
.thenReturn(
Flowable.defer(() -> buildMockedApiWithErrorChatLoading(testNameRule.getMethodName())).toObservable()
);
when(keyGenerator.getKeyPairFromPassPhrase(TestConstants.TEST_PASS_PHRASE))
.thenReturn(new KeyPair(TestConstants.TEST_PUBLIC_KEY, TestConstants.TEST_SECRET_KEY));
wrapper = new AdamantApiWrapper(apiBuilder, keyGenerator, scheduler);
Disposable subscribe = wrapper
.authorize(TestConstants.TEST_PASS_PHRASE)
.subscribe();
subscritions.add(subscribe);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
}
@Test
public void testScheduler() {
TestScheduler scheduler = new TestScheduler(); //1
Observable<Long> tick = Observable
.interval(1, TimeUnit.SECONDS, scheduler); //2
Observable<String> observable =
Observable.just("foo", "bar", "biz", "baz") //3
.zipWith(tick, (string, index) -> index + "-" + string);//4
TestObserver<String> testObserver = observable
.subscribeOn(scheduler).test();//5
scheduler.advanceTimeBy(2300, TimeUnit.MILLISECONDS);//6
testObserver.assertNoErrors(); //7
testObserver.assertValues("0-foo", "1-bar");
testObserver.assertNotComplete();
}
@Test
public void test_interval()
{
TestScheduler testScheduler=new TestScheduler();
Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5);
TestObserver<Long> testObserver=new TestObserver<>();
observable.subscribeOn(testScheduler).subscribe(testObserver);
testObserver.assertNoValues();
testObserver.assertNotComplete();
testObserver.assertNoErrors();
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertValues(0l);
testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
testObserver.assertValueCount(5);
testObserver.assertValues(0l,1l,2l,3l,4l);
}
@Test
public void work2() {
((TestScheduler) ThreadUtil.computationScheduler()).advanceTimeBy(5, TimeUnit.SECONDS);
try {
TestObserver<List<ThreadInfo>> subscriber = new TestObserver<>();
GodEye.instance().<ThreadDump, List<ThreadInfo>>moduleObservable(GodEye.ModuleName.THREAD).subscribe(subscriber);
subscriber.assertValue(new Predicate<List<ThreadInfo>>() {
@Override
public boolean test(List<ThreadInfo> threadInfos) throws Exception {
return threadInfos != null && !threadInfos.isEmpty();
}
});
} catch (UninstallException e) {
Assert.fail();
}
}
@Test
public void testMaxIdleTime() throws InterruptedException {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(3) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>( //
pool.member()) //
.doOnNext(m -> m.checkin()) //
.doOnNext(System.out::println) //
.doOnRequest(t -> System.out.println("test request=" + t)) //
.test(1);
s.triggerActions();
ts.assertValueCount(1);
assertEquals(0, disposed.get());
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(1, disposed.get());
}
@Test
public void testConnectionPoolRecylesAlternating() {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(2) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.scheduler(s) //
.build();
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
.repeat() //
.doOnNext(m -> m.checkin()) //
.map(m -> m.value()) //
.test(4); //
s.triggerActions();
ts.assertValueCount(4) //
.assertNotTerminated();
List<Object> list = ts.getEvents().get(0);
// all 4 connections released were the same
assertTrue(list.get(0) == list.get(1));
assertTrue(list.get(1) == list.get(2));
assertTrue(list.get(2) == list.get(3));
}
@Test
public void testMemberAvailableAfterCreationScheduledIsUsedImmediately() throws InterruptedException {
TestScheduler ts = new TestScheduler();
Scheduler s = createScheduleToDelayCreation(ts);
AtomicInteger count = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.createRetryInterval(10, TimeUnit.MINUTES) //
.maxSize(2) //
.maxIdleTime(1, TimeUnit.HOURS) //
.scheduler(s) //
.build();
List<Member<Integer>> list = new ArrayList<Member<Integer>>();
pool.member().doOnSuccess(m -> list.add(m)).subscribe();
assertEquals(0, list.size());
ts.advanceTimeBy(1, TimeUnit.MINUTES);
ts.triggerActions();
assertEquals(1, list.size());
pool.member().doOnSuccess(m -> list.add(m)).subscribe();
list.get(0).checkin();
ts.triggerActions();
assertEquals(2, list.size());
}
@Test
public void testSubscribeWhenPoolClosedEmitsError() throws Exception {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(3) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
pool.close();
new FlowableSingleDeferUntilRequest<>( //
pool.member()) //
.test(1) //
.assertError(PoolClosedException.class) //
.assertNoValues();
}
@Test
public void testWithScheduler() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
// next wait is 2 seconds so advancing by 1 should do nothing
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2);
ts.assertNotComplete();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
ts.assertValues(1, 2, 1, 2, 1, 2);
ts.assertError(ex);
}
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFails() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionAllowed() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryWhenInstanceOf(IllegalArgumentException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
}
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Predicate<Throwable> predicate = new Predicate<Throwable>() {
@Override
public boolean test(Throwable t) {
return t instanceof IllegalArgumentException;
}
};
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryIf(predicate).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
}
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsFalse() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Predicate<Throwable> predicate = Predicates.alwaysFalse();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryIf(predicate).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@Test
public void testInsertWithDelays() {
TestScheduler s = new TestScheduler();
TestSubscriber<Integer> ts = //
Flowable.just(1).delay(1, TimeUnit.SECONDS, s) //
.concatWith(Flowable.just(2).delay(3, TimeUnit.SECONDS, s)) //
.compose(Transformers.insert(Maybe.just(3).delay(2, TimeUnit.SECONDS, s))) //
.test();
ts.assertNoValues();
s.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1);
s.advanceTimeBy(2, TimeUnit.SECONDS);
ts.assertValues(1, 3);
s.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1, 3, 2);
ts.assertComplete();
}
@Test
public void test() {
TestScheduler tsch = new TestScheduler();
Flowable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
@Test
public void testObservable() {
TestScheduler tsch = new TestScheduler();
Observable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
@Test
public void test() {
TestScheduler testScheduler = new TestScheduler();
final Single<List<Integer>> first = Single.timer(2, TimeUnit.SECONDS, testScheduler)
.map(u -> Arrays.asList(1, 2, 3));
final Single<List<Integer>> second = Single.just(Collections.emptyList());
final Single<List<Integer>> third = Single.just(Collections.singletonList(4));
final Single<List<Integer>> fourth = Single.just(Collections.singletonList(5));
Single<List<Integer>> subject = Observable
.fromIterable(Arrays.asList(first, second, third, fourth))
.concatMapSingle(single -> single)
.reduce(new ArrayList<>(), (seed, items) -> {
seed.addAll(items);
return seed;
});
TestObserver<List<Integer>> testObserver = subject.test();
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
System.out.println(testObserver.values());
testObserver.assertValue(list -> list.equals(Arrays.asList(1, 2, 3, 4, 5)));
// 5 is currently missing ; fourth was never subscribed in the first place
}
@Test
public void givenObservable_whenFlatmap_shouldAssertAllItemsReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.flatMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertThat(actualOutput, hasItems("b FirstResult", "b SecondResult",
"boo FirstResult", "boo SecondResult",
"bo FirstResult", "bo SecondResult",
"book FirstResult", "book SecondResult",
"books FirstResult", "books SecondResult"));
}
@Test
public void givenObservable_whenSwitchmap_shouldAssertLatestItemReturned() {
//given
List<String> actualOutput = new ArrayList<>();
final TestScheduler scheduler = new TestScheduler();
final List<String> keywordToSearch = Arrays.asList("b", "bo", "boo", "book", "books");
//when
Observable.fromIterable(keywordToSearch)
.switchMap(s -> Observable
.just(s + " FirstResult", s + " SecondResult")
.delay(10, TimeUnit.SECONDS, scheduler))
.toList()
.doOnSuccess(s -> actualOutput.addAll(s))
.subscribe();
scheduler.advanceTimeBy(1, TimeUnit.MINUTES);
//then
assertEquals(2, actualOutput.size());
assertThat(actualOutput, hasItems("books FirstResult", "books SecondResult"));
}
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(0));
scheduler.triggerActions();
assertThat(action.getRunCount(), is(1));
}
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is(equalTo(null)));
scheduler.triggerActions();
assertThat(consumer.getCurrentValue(), is("First Time"));
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function = s -> s.length();
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertValue(5);
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function =
s -> {
throw new RuntimeException("Something bad happened");
};
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertError(RuntimeException.class);
}
@Before
public void setUp() throws Exception {
CompositeDisposable compositeDisposable = new CompositeDisposable();
mTestScheduler = new TestScheduler();
TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider(mTestScheduler);
mLoginPresenter = new LoginPresenter<>(
mMockLoginMvpInteractor,
testSchedulerProvider,
compositeDisposable);
mLoginPresenter.onAttach(mMockLoginMvpView);
}
@Before
public void setUp() throws Exception {
mTestScheduler = new TestScheduler();
TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider(mTestScheduler);
mLoginViewModel = new LoginViewModel(mMockDataManager, testSchedulerProvider);
mLoginViewModel.setNavigator(mLoginCallback);
}
@Test
void pollsResponse() {
ResponseQueue responseQueue = new ResponseQueue();
TestScheduler scheduler = new TestScheduler();
Observable<Response> observable = responseStream(scheduler);
MaybeSubject<Response> maybeSubject = MaybeSubject.create();
observable.firstElement().subscribe(maybeSubject);
responseQueue.offer(ResponseType.PING, maybeSubject);
assertThat(responseQueue.peekResponse(ResponseType.PING)).isFalse();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertThat(responseQueue.peekResponse(ResponseType.PING)).isTrue();
}
public static void setupRxjava() {
TestScheduler testScheduler = new TestScheduler();
RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
public Scheduler apply(Scheduler scheduler) throws Exception {
return testScheduler;
}
});
}
@Test
public void work2() {
((TestScheduler) ThreadUtil.computationScheduler()).advanceTimeBy(5, TimeUnit.SECONDS);
try {
List<TrafficInfo> trafficInfos = GodEye.instance().<Traffic, TrafficInfo>moduleObservable(GodEye.ModuleName.TRAFFIC).test().values();
Assert.assertTrue(trafficInfos != null && !trafficInfos.isEmpty());
} catch (UninstallException e) {
Assert.fail();
}
}