下面列出了io.reactivex.Flowable#as ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void zeroMessageResponseManyToOne() {
serverRule.getServiceRegistry().addService(new MissingUnaryResponseService());
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(serverRule.getChannel());
Flowable<HelloRequest> req = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);
TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
testObserver.assertError(StatusRuntimeException.class);
testObserver.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.CANCELLED.getCode());
}
@Test
public void manyToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);
TestObserver<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
// Flowable requests get canceled when unexpected errors happen
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void manyToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);
TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
testObserver.assertValue("Hello a and b and c");
}
@Override
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.range(0, (int)elements).map(this::toMessage);
Single<Message> publisher = request.as(stub::manyToOne);
return publisher.toFlowable();
}
@Override
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
Single<Message> publisher = request.as(stub::manyToOne);
return publisher.toFlowable();
}
@Test
public void fourKindsOfRequestAtOnce() throws Exception {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
// == MAKE REQUESTS ==
// One to One
Single<HelloRequest> req1 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp1 = req1.compose(stub::sayHello);
// One to Many
Single<HelloRequest> req2 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Flowable<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);
// Many to One
Flowable<HelloRequest> req3 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Single<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);
// Many to Many
Flowable<HelloRequest> req4 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build(),
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());
Flowable<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);
// == VERIFY RESPONSES ==
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// Run all four verifications in parallel
try {
// One to One
ListenableFuture<Boolean> oneToOne = executorService.submit(() -> {
TestObserver<String> testObserver1 = resp1.map(HelloResponse::getMessage).test();
testObserver1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver1.assertValue("Hello rxjava");
return true;
});
// One to Many
ListenableFuture<Boolean> oneToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber1 = resp2.map(HelloResponse::getMessage).test();
testSubscriber1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber1.assertValues("Hello rxjava", "Hi rxjava", "Greetings rxjava");
return true;
});
// Many to One
ListenableFuture<Boolean> manyToOne = executorService.submit(() -> {
TestObserver<String> testObserver2 = resp3.map(HelloResponse::getMessage).test();
testObserver2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver2.assertValue("Hello a and b and c");
return true;
});
// Many to Many
ListenableFuture<Boolean> manyToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber2 = resp4.map(HelloResponse::getMessage).test();
testSubscriber2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber2.assertValues("Hello a and b", "Hello c and d", "Hello e");
testSubscriber2.assertComplete();
return true;
});
@SuppressWarnings("unchecked")
ListenableFuture<List<Boolean>> allFutures = Futures.allAsList(Lists.newArrayList(oneToOne, oneToMany, manyToOne, manyToMany));
// Block for response
List<Boolean> results = allFutures.get(3, TimeUnit.SECONDS);
assertThat(results).containsExactly(true, true, true, true);
} finally {
executorService.shutdown();
}
}