io.reactivex.subscribers.TestSubscriber#cancel ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#cancel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationBetweenRequestAndValue() {
    // TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
    executor = Executors.newSingleThreadExecutor();
    Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return x;
    }).convert().toPublisher();

    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
@Test
public void testCancelDeletesPageFileWhenNoOutstandingRequests() {
    long n = 50L;
    byte[] bytes = new byte[40];
    new Random().nextBytes(bytes);
    File pageFile = new File("target/bufferToFile-1");
    TestSubscriber<byte[]> ts = Flowables.repeat(bytes, n) //
            .compose(onBackpressureBufferToFile() //
                    .fileFactory(Callables.constant(pageFile)) //
                    .scheduler(Schedulers.trampoline()) //
                    .pageSizeBytes(20000000) //
                    .serializerBytes()) //
            .test(1) //
            .assertNoErrors() //
            .assertNotComplete();
    ts.cancel();
    assertFalse(pageFile.exists());
}
 
源代码3 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationBetweenSubscriptionAndRequest() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
源代码4 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationAfterValue() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    // Immediate emission, so cancel is called after the emission.
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertValue(1);
    test.assertComplete();
}
 
@Test
public void testCancelBeforeRequest() {
    Single<Integer> s = Single.fromCallable(() -> {
        return 1;
    });
    TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
            .test(0);
    ts.cancel();
    ts.assertNoValues();
    ts.assertNotTerminated();
    ts.cancel();
}
 
源代码6 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelled() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(2) //
            .assertValues("boo", "and").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(2);
    ts.assertNotTerminated();
}
 
源代码7 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelledEarly() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(1) //
            .assertValues("boo").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(1);
    ts.assertNotTerminated();
}
 
源代码8 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelledAtBeginning() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(0) //
            .assertNoValues() //
            .assertNotTerminated();
    ts.cancel();
    ts.requestMore(1);
    ts.assertNoValues();
    ts.assertNotTerminated();
}
 
@Test
public void testInterleaveCancel() {
    Flowable<Integer> a = Flowable.just(1).repeat();
    Flowable<Integer> b = Flowable.just(2).repeat();
    TestSubscriber<Integer> ts = Flowables.mergeInterleaved(Flowable.just(a, b), 2, 1, true) //
            .test(3);
    ts.assertValues(1, 2, 1);
    ts.cancel();
    ts.requestMore(100) //
            .assertValueCount(3) //
            .assertNotTerminated();
}
 
源代码10 项目: rxjava2-extras   文件: FlowableMaxRequestTest.java
@Test
public void checkCancel() {
    List<Long> requests = new CopyOnWriteArrayList<Long>();
    TestSubscriber<Integer> ts = Flowable.range(1, 10) //
            .doOnRequest(Consumers.addLongTo(requests)) //
            .compose(Transformers.<Integer>maxRequest(3)) //
            .test(4).assertValues(1, 2, 3, 4); //
    ts.cancel();
    ts.requestMore(3);
    ts.assertValueCount(4);
    ts.assertNotTerminated();
    assertEquals(Arrays.asList(3L, 1L), requests);
}
 
源代码11 项目: rxjava2-extras   文件: FlowableCollectWhileTest.java
@Test
public void testBackpressureAndCancel() {

    TestSubscriber<List<Integer>> ts = Flowable.just(3, 4, 5, 6, 7, 8) //
            .compose(Transformers. //
                    toListWhile(BUFFER_TWO)) //
            .test(1) //
            .assertValue(list(3, 4)) //
            .assertNotTerminated();
    ts.cancel(); //
    ts.requestMore(Long.MAX_VALUE) //
            .assertValueCount(1) //
            .assertNotTerminated();
}
 
源代码12 项目: rxjava2-extras   文件: TransformersTest.java
@Test
public void testInsertCancel() {
    TestSubscriber<Integer> ts = Flowable.just(1, 2) //
            .compose(Transformers.insert(Maybe.just(3))) //
            .test(0) //
            .assertNoValues() //
            .requestMore(1) //
            .assertValues(1);
    ts.cancel();
    ts.requestMore(100) //
            .assertValues(1) //
            .assertNotTerminated();
}
 
源代码13 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testHealthCheckWhenFails() throws Exception {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    AtomicInteger healthChecks = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> {
                healthChecks.incrementAndGet();
                return false;
            }) //
            .createRetryInterval(10, TimeUnit.MINUTES) //
            .idleTimeBeforeHealthCheck(1, TimeUnit.MILLISECONDS) //
            .maxSize(1) //
            .maxIdleTime(1, TimeUnit.HOURS) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    {
        TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
                .repeat() //
                .doOnNext(System.out::println) //
                .doOnNext(m -> m.checkin()) //
                .doOnRequest(t -> System.out.println("test request=" + t)) //
                .test(1);
        s.triggerActions();
        // health check doesn't get run on create
        ts.assertValueCount(1);
        assertEquals(0, disposed.get());
        assertEquals(0, healthChecks.get());
        // next request is immediate so health check does not run
        System.out.println("health check should not run because immediate");
        ts.request(1);
        s.triggerActions();
        ts.assertValueCount(2);
        assertEquals(0, disposed.get());
        assertEquals(0, healthChecks.get());

        // now try to trigger health check
        s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        s.triggerActions();
        System.out.println("trying to trigger health check");
        ts.request(1);
        s.triggerActions();
        ts.assertValueCount(2);
        assertEquals(1, disposed.get());
        assertEquals(1, healthChecks.get());

        // checkout retry should happen after interval
        s.advanceTimeBy(10, TimeUnit.MINUTES);
        ts.assertValueCount(3);

        // failing health check causes recreate to be scheduled
        ts.cancel();
        // already disposed so cancel has no effect
        assertEquals(1, disposed.get());
    }
}