io.reactivex.subscribers.TestSubscriber#request ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#request ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationBetweenRequestAndValue() {
    // TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
    executor = Executors.newSingleThreadExecutor();
    Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return x;
    }).convert().toPublisher();

    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
源代码2 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testWithImmediateValueWithRequest() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    test.assertResult(1);
    test.assertComplete();
}
 
源代码3 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testWithImmediateValueWithRequests() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(20);
    test.assertResult(1);
    test.assertComplete();
}
 
源代码4 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testInvalidRequest() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(0);
    test.assertError(IllegalArgumentException.class);
    test.assertTerminated();
}
 
源代码5 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationAfterValue() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    // Immediate emission, so cancel is called after the emission.
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertValue(1);
    test.assertComplete();
}
 
源代码6 项目: smallrye-mutiny   文件: UniConvertToTest.java
@Test
public void testCreatingAFlowableWithRequest() {
    AtomicBoolean called = new AtomicBoolean();
    Flowable<Integer> flowable = Uni.createFrom().deferred(() -> {
        called.set(true);
        return Uni.createFrom().item(1);
    }).convert().with(UniRxConverters.toFlowable());
    assertThat(flowable).isNotNull();
    TestSubscriber<Integer> test = flowable.test(0);
    assertThat(called).isFalse();
    test.assertNoValues().assertSubscribed();
    test.request(2);
    test.assertValue(1).assertComplete();
    assertThat(called).isTrue();
}
 
@RepeatedTest(2)
public void shouldSupportOnlySinglePrefetchTest() throws InterruptedException {
    for (int i = 0; i < 10; i++) {
        final TestSubscriber<Integer> downstream = new TestSubscriber<Integer>(0);
        final TestStreamObserverAndPublisher<Integer> processor = new TestStreamObserverAndPublisher<Integer>(null);
        final TestCallStreamObserverProducer upstream = new TestCallStreamObserverProducer(executorService, processor, 100000000);
        processor.onSubscribe(upstream);
        upstream.requested = 1; // prevents running elements sending but allows
        // checking how much elements requested at first
        processor.subscribe(downstream);

        for (int j = 0; j < 1000; j++) {
            final CountDownLatch latch = new CountDownLatch(1);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    latch.countDown();
                    downstream.request(1);
                }
            });
            latch.await();
            downstream.request(1);
        }

        Assertions.assertThat(upstream.requestsQueue)
                  .hasSize(1)
                  .containsOnly(DEFAULT_CHUNK_SIZE);
    }
}
 
源代码8 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testHealthCheckWhenFails() throws Exception {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    AtomicInteger healthChecks = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> {
                healthChecks.incrementAndGet();
                return false;
            }) //
            .createRetryInterval(10, TimeUnit.MINUTES) //
            .idleTimeBeforeHealthCheck(1, TimeUnit.MILLISECONDS) //
            .maxSize(1) //
            .maxIdleTime(1, TimeUnit.HOURS) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    {
        TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
                .repeat() //
                .doOnNext(System.out::println) //
                .doOnNext(m -> m.checkin()) //
                .doOnRequest(t -> System.out.println("test request=" + t)) //
                .test(1);
        s.triggerActions();
        // health check doesn't get run on create
        ts.assertValueCount(1);
        assertEquals(0, disposed.get());
        assertEquals(0, healthChecks.get());
        // next request is immediate so health check does not run
        System.out.println("health check should not run because immediate");
        ts.request(1);
        s.triggerActions();
        ts.assertValueCount(2);
        assertEquals(0, disposed.get());
        assertEquals(0, healthChecks.get());

        // now try to trigger health check
        s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        s.triggerActions();
        System.out.println("trying to trigger health check");
        ts.request(1);
        s.triggerActions();
        ts.assertValueCount(2);
        assertEquals(1, disposed.get());
        assertEquals(1, healthChecks.get());

        // checkout retry should happen after interval
        s.advanceTimeBy(10, TimeUnit.MINUTES);
        ts.assertValueCount(3);

        // failing health check causes recreate to be scheduled
        ts.cancel();
        // already disposed so cancel has no effect
        assertEquals(1, disposed.get());
    }
}