下面列出了io.reactivex.subscribers.TestSubscriber#cancel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newSingleThreadExecutor();
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x;
}).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCancelDeletesPageFileWhenNoOutstandingRequests() {
long n = 50L;
byte[] bytes = new byte[40];
new Random().nextBytes(bytes);
File pageFile = new File("target/bufferToFile-1");
TestSubscriber<byte[]> ts = Flowables.repeat(bytes, n) //
.compose(onBackpressureBufferToFile() //
.fileFactory(Callables.constant(pageFile)) //
.scheduler(Schedulers.trampoline()) //
.pageSizeBytes(20000000) //
.serializerBytes()) //
.test(1) //
.assertNoErrors() //
.assertNotComplete();
ts.cancel();
assertFalse(pageFile.exists());
}
@Test
public void testCancellationBetweenSubscriptionAndRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCancellationAfterValue() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
// Immediate emission, so cancel is called after the emission.
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertValue(1);
test.assertComplete();
}
@Test
public void testCancelBeforeRequest() {
Single<Integer> s = Single.fromCallable(() -> {
return 1;
});
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
.test(0);
ts.cancel();
ts.assertNoValues();
ts.assertNotTerminated();
ts.cancel();
}
@Test
public void testSplitSimpleNormalCancelled() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(2) //
.assertValues("boo", "and").assertNotTerminated();
ts.cancel();
ts.assertValueCount(2);
ts.assertNotTerminated();
}
@Test
public void testSplitSimpleNormalCancelledEarly() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(1) //
.assertValues("boo").assertNotTerminated();
ts.cancel();
ts.assertValueCount(1);
ts.assertNotTerminated();
}
@Test
public void testSplitSimpleNormalCancelledAtBeginning() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(0) //
.assertNoValues() //
.assertNotTerminated();
ts.cancel();
ts.requestMore(1);
ts.assertNoValues();
ts.assertNotTerminated();
}
@Test
public void testInterleaveCancel() {
Flowable<Integer> a = Flowable.just(1).repeat();
Flowable<Integer> b = Flowable.just(2).repeat();
TestSubscriber<Integer> ts = Flowables.mergeInterleaved(Flowable.just(a, b), 2, 1, true) //
.test(3);
ts.assertValues(1, 2, 1);
ts.cancel();
ts.requestMore(100) //
.assertValueCount(3) //
.assertNotTerminated();
}
@Test
public void checkCancel() {
List<Long> requests = new CopyOnWriteArrayList<Long>();
TestSubscriber<Integer> ts = Flowable.range(1, 10) //
.doOnRequest(Consumers.addLongTo(requests)) //
.compose(Transformers.<Integer>maxRequest(3)) //
.test(4).assertValues(1, 2, 3, 4); //
ts.cancel();
ts.requestMore(3);
ts.assertValueCount(4);
ts.assertNotTerminated();
assertEquals(Arrays.asList(3L, 1L), requests);
}
@Test
public void testBackpressureAndCancel() {
TestSubscriber<List<Integer>> ts = Flowable.just(3, 4, 5, 6, 7, 8) //
.compose(Transformers. //
toListWhile(BUFFER_TWO)) //
.test(1) //
.assertValue(list(3, 4)) //
.assertNotTerminated();
ts.cancel(); //
ts.requestMore(Long.MAX_VALUE) //
.assertValueCount(1) //
.assertNotTerminated();
}
@Test
public void testInsertCancel() {
TestSubscriber<Integer> ts = Flowable.just(1, 2) //
.compose(Transformers.insert(Maybe.just(3))) //
.test(0) //
.assertNoValues() //
.requestMore(1) //
.assertValues(1);
ts.cancel();
ts.requestMore(100) //
.assertValues(1) //
.assertNotTerminated();
}
@Test
public void testHealthCheckWhenFails() throws Exception {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
AtomicInteger healthChecks = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> {
healthChecks.incrementAndGet();
return false;
}) //
.createRetryInterval(10, TimeUnit.MINUTES) //
.idleTimeBeforeHealthCheck(1, TimeUnit.MILLISECONDS) //
.maxSize(1) //
.maxIdleTime(1, TimeUnit.HOURS) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
{
TestSubscriber<Member<Integer>> ts = new FlowableSingleDeferUntilRequest<>(pool.member()) //
.repeat() //
.doOnNext(System.out::println) //
.doOnNext(m -> m.checkin()) //
.doOnRequest(t -> System.out.println("test request=" + t)) //
.test(1);
s.triggerActions();
// health check doesn't get run on create
ts.assertValueCount(1);
assertEquals(0, disposed.get());
assertEquals(0, healthChecks.get());
// next request is immediate so health check does not run
System.out.println("health check should not run because immediate");
ts.request(1);
s.triggerActions();
ts.assertValueCount(2);
assertEquals(0, disposed.get());
assertEquals(0, healthChecks.get());
// now try to trigger health check
s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
s.triggerActions();
System.out.println("trying to trigger health check");
ts.request(1);
s.triggerActions();
ts.assertValueCount(2);
assertEquals(1, disposed.get());
assertEquals(1, healthChecks.get());
// checkout retry should happen after interval
s.advanceTimeBy(10, TimeUnit.MINUTES);
ts.assertValueCount(3);
// failing health check causes recreate to be scheduled
ts.cancel();
// already disposed so cancel has no effect
assertEquals(1, disposed.get());
}
}