下面列出了io.reactivex.subscribers.TestSubscriber#assertResult ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx2/flowable/textJson";
List<Object> providers = new LinkedList<>();
providers.add(new JacksonJsonProvider());
providers.add(new FlowableRxInvokerProvider());
WebClient wc = WebClient.create(address, providers);
Flowable<HelloWorldBean> obs = wc.accept("application/json")
.rx(FlowableRxInvoker.class)
.get(HelloWorldBean.class);
final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
obs.subscribe(subscriber);
subscriber.await(3, TimeUnit.SECONDS);
subscriber.assertResult(new HelloWorldBean("Hello", "World"));
}
@Test
public void testWithImmediateValueWithRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testWithImmediateValueWithRequests() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(20);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testOperator_range() {
TestSubscriber<Long> test_Subscriber = Flowable.rangeLong(10, 5).test();
test_Subscriber.assertResult(10L, 11L, 12L, 13L, 14L);
test_Subscriber.assertValueAt(2, (item) -> {
return item == 12L;
});
}
@Test
public void test() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestSubscriber<Integer> ts = Flowable.zip(pp1.last(1).toFlowable(), pp2.last(2).toFlowable(), (a, b) -> a + b)
.test();
pp1.onNext(3);
pp1.onComplete();
pp2.onComplete();
ts.assertResult(5);
}
@Test
public void testGetHelloWorldAsyncObservable() throws Exception {
String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync";
WebClient wc = WebClient.create(address,
Collections.singletonList(new FlowableRxInvokerProvider()));
Flowable<String> obs = wc.accept("text/plain")
.rx(FlowableRxInvoker.class)
.get(String.class);
final TestSubscriber<String> subscriber = new TestSubscriber<>();
obs.map(s -> s + s).subscribe(subscriber);
subscriber.await(2, TimeUnit.SECONDS);
subscriber.assertResult("Hello, world!Hello, world!");
}