下面列出了io.reactivex.subscribers.TestSubscriber#request ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newSingleThreadExecutor();
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x;
}).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testWithImmediateValueWithRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testWithImmediateValueWithRequests() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(20);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testInvalidRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(0);
test.assertError(IllegalArgumentException.class);
test.assertTerminated();
}
@Test
public void testCancellationAfterValue() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
// Immediate emission, so cancel is called after the emission.
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertValue(1);
test.assertComplete();
}
@Test
public void testCreatingAFlowableWithRequest() {
AtomicBoolean called = new AtomicBoolean();
Flowable<Integer> flowable = Uni.createFrom().deferred(() -> {
called.set(true);
return Uni.createFrom().item(1);
}).convert().with(UniRxConverters.toFlowable());
assertThat(flowable).isNotNull();
TestSubscriber<Integer> test = flowable.test(0);
assertThat(called).isFalse();
test.assertNoValues().assertSubscribed();
test.request(2);
test.assertValue(1).assertComplete();
assertThat(called).isTrue();
}
@RepeatedTest(2)
public void shouldSupportOnlySinglePrefetchTest() throws InterruptedException {
for (int i = 0; i < 10; i++) {
final TestSubscriber<Integer> downstream = new TestSubscriber<Integer>(0);
final TestStreamObserverAndPublisher<Integer> processor = new TestStreamObserverAndPublisher<Integer>(null);
final TestCallStreamObserverProducer upstream = new TestCallStreamObserverProducer(executorService, processor, 100000000);
processor.onSubscribe(upstream);
upstream.requested = 1; // prevents running elements sending but allows
// checking how much elements requested at first
processor.subscribe(downstream);
for (int j = 0; j < 1000; j++) {
final CountDownLatch latch = new CountDownLatch(1);
executorService.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
downstream.request(1);
}
});
latch.await();
downstream.request(1);
}
Assertions.assertThat(upstream.requestsQueue)
.hasSize(1)
.containsOnly(DEFAULT_CHUNK_SIZE);
}
}
@Test
public void testHealthCheckWhenFails() throws Exception {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
AtomicInteger healthChecks = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> {
healthChecks.incrementAndGet();
return false;
}) //
.createRetryInterval(10, TimeUnit.MINUTES) //
.idleTimeBeforeHealthCheck(1, TimeUnit.MILLISECONDS) //
.maxSize(1) //
.maxIdleTime(1, TimeUnit.HOURS) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
{
TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
.repeat() //
.doOnNext(System.out::println) //
.doOnNext(m -> m.checkin()) //
.doOnRequest(t -> System.out.println("test request=" + t)) //
.test(1);
s.triggerActions();
// health check doesn't get run on create
ts.assertValueCount(1);
assertEquals(0, disposed.get());
assertEquals(0, healthChecks.get());
// next request is immediate so health check does not run
System.out.println("health check should not run because immediate");
ts.request(1);
s.triggerActions();
ts.assertValueCount(2);
assertEquals(0, disposed.get());
assertEquals(0, healthChecks.get());
// now try to trigger health check
s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
s.triggerActions();
System.out.println("trying to trigger health check");
ts.request(1);
s.triggerActions();
ts.assertValueCount(2);
assertEquals(1, disposed.get());
assertEquals(1, healthChecks.get());
// checkout retry should happen after interval
s.advanceTimeBy(10, TimeUnit.MINUTES);
ts.assertValueCount(3);
// failing health check causes recreate to be scheduled
ts.cancel();
// already disposed so cancel has no effect
assertEquals(1, disposed.get());
}
}