下面列出了怎么用io.reactivex.rxjava3.schedulers.TestScheduler的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSizeAndHasAnyValueTimeBounded() {
TestScheduler ts = new TestScheduler();
ReplayRelay<Object> rs = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, ts);
assertEquals(0, rs.size());
assertFalse(rs.hasValue());
for (int i = 0; i < 1000; i++) {
rs.accept(i);
assertEquals(1, rs.size());
assertTrue(rs.hasValue());
ts.advanceTimeBy(2, TimeUnit.SECONDS);
assertEquals(0, rs.size());
assertFalse(rs.hasValue());
}
}
@Test
public void takeSizeAndTime() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rp.accept(1);
rp.accept(2);
rp.accept(3);
rp
.take(1)
.test()
.assertResult(2);
}
@Test
public void reentrantDrain() {
TestScheduler scheduler = new TestScheduler();
final ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
TestObserver<Integer> ts = new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
if (t == 1) {
rp.accept(2);
}
super.onNext(t);
}
};
rp.subscribe(ts);
rp.accept(1);
ts.assertValues(1, 2);
}
@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer> rs = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rs.accept(1);
rs.accept(2);
rs.accept(3); // remove 1 due to maxSize, size == 2
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
rs.accept(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
rs.accept(5); // size == 2
rs.test().assertValuesOnly(4, 5);
}
@Test
public void testSizeAndHasAnyValueTimeBounded() {
TestScheduler ts = new TestScheduler();
ReplayRelay<Object> rs = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, ts);
assertEquals(0, rs.size());
assertFalse(rs.hasValue());
for (int i = 0; i < 1000; i++) {
rs.accept(i);
assertEquals(1, rs.size());
assertTrue(rs.hasValue());
ts.advanceTimeBy(2, TimeUnit.SECONDS);
assertEquals(0, rs.size());
assertFalse(rs.hasValue());
}
}
@Test
public void takeSizeAndTime() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rp.accept(1);
rp.accept(2);
rp.accept(3);
rp
.take(1)
.test()
.assertResult(2);
}
@Test
public void reentrantDrain() {
TestScheduler scheduler = new TestScheduler();
final ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
TestObserver<Integer> ts = new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
if (t == 1) {
rp.accept(2);
}
super.onNext(t);
}
};
rp.subscribe(ts);
rp.accept(1);
ts.assertValues(1, 2);
}
@Test
public void timeAndSizeRemoveCorrectNumberOfOld() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer> rs = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rs.accept(1);
rs.accept(2);
rs.accept(3); // remove 1 due to maxSize, size == 2
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
rs.accept(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
rs.accept(5); // size == 2
rs.test().assertValuesOnly(4, 5);
}
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(0));
scheduler.triggerActions();
assertThat(action.getRunCount(), is(1));
}
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is(equalTo(null)));
scheduler.triggerActions();
assertThat(consumer.getCurrentValue(), is("First Time"));
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function = String::length;
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertValue(5);
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function =
s -> {
throw new RuntimeException("Something bad happened");
};
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertError(RuntimeException.class);
}
@Test
public void timedSkipOld() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rp.accept(1);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
rp.test()
.assertEmpty();
}
@Test
public void prefetchIsCanceledWhenDisposed() throws Exception {
server.enqueue(Utils.INSTANCE.mockResponse(FILE_EPISODE_HERO_NAME_WITH_ID));
TestObserver<EpisodeHeroNameQuery.Data> testObserver = new TestObserver<>();
Disposable disposable = Rx3Apollo
.from(apolloClient.prefetch(new EpisodeHeroNameQuery(Input.fromNullable(EMPIRE))))
.observeOn(new TestScheduler())
.subscribeWith(testObserver);
disposable.dispose();
testObserver.assertNotComplete();
assertThat(testObserver.isDisposed()).isTrue();
}
@Test
public void timedSkipOld() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer>
rp = ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
rp.accept(1);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
rp.test()
.assertEmpty();
}
public TestScheduler getTestScheduler() {
return testScheduler;
}
@Before
public void setup() {
lifecycle = PublishSubject.create();
testScheduler = new TestScheduler();
}
@Before
public void setup() {
lifecycle = PublishSubject.create();
testScheduler = new TestScheduler();
}
@Before
public void setup() {
lifecycle = PublishSubject.create();
testScheduler = new TestScheduler();
}
@Test
public void testReplayTimestampedDirectly() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer> source = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, scheduler);
source.accept(1);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Observer<Integer> o = TestHelper.mockObserver();
source.subscribe(o);
source.accept(2);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
source.accept(3);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
//source.onComplete();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
verify(o, never()).onError(any(Throwable.class));
verify(o, never()).onNext(1);
verify(o).onNext(2);
verify(o).onNext(3);
//verify(o).onComplete();
}
@Test
public void testReplayTimestampedDirectly() {
TestScheduler scheduler = new TestScheduler();
ReplayRelay<Integer> source = ReplayRelay.createWithTime(1, TimeUnit.SECONDS, scheduler);
source.accept(1);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Observer<Integer> o = TestHelper.mockObserver();
source.subscribe(o);
source.accept(2);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
source.accept(3);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
//source.onComplete();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
verify(o, never()).onError(any(Throwable.class));
verify(o, never()).onNext(1);
verify(o).onNext(2);
verify(o).onNext(3);
//verify(o).onComplete();
}