下面列出了io.reactivex.subscribers.TestSubscriber#assertSubscribed ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newSingleThreadExecutor();
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x;
}).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void test_just_Flowable() {
Flowable<String> observable = Flowable.just("mango", "papaya", "guava");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
observable.subscribe(testSubscriber);
List<String> items = testSubscriber.values();
testSubscriber.assertComplete();
testSubscriber.assertSubscribed();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(3);
testSubscriber.assertValues("mango", "papaya", "guava");
}
@Test
public void testWithImmediateValueWithRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testWithImmediateValueWithRequests() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(20);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testInvalidRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(0);
test.assertError(IllegalArgumentException.class);
test.assertTerminated();
}
@Test
public void testWithImmediateValueWithOneRequestAndImmediateCancellation() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(1, true);
test.assertSubscribed();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCancellationBetweenSubscriptionAndRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCancellationAfterValue() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
// Immediate emission, so cancel is called after the emission.
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertValue(1);
test.assertComplete();
}
@Test
public void testSubscriberWithException() {
Flowable<Integer> flowable = Flowable.create(source -> {
source.onNext(1);
source.onError(new RuntimeException());
}, BackpressureStrategy.LATEST);
TestSubscriber<Integer> ts = flowable.test();
ts.assertSubscribed();
ts.assertError(RuntimeException.class);
}