下面列出了io.reactivex.Single#compose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloRequest> req = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp = req.compose(stub::sayHello);
AtomicReference<String> clientThreadName = new AtomicReference<>();
TestObserver<String> testObserver = resp
.map(HelloResponse::getMessage)
.doOnSuccess(x -> clientThreadName.set(Thread.currentThread().getName()))
.test();
testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
assertThat(clientThreadName.get()).isEqualTo("TheGrpcClient");
assertThat(serverThreadName.get()).isEqualTo("TheGrpcServer");
}
@Test
public void zeroMessageResponseOneToOne() {
serverRule.getServiceRegistry().addService(new MissingUnaryResponseService());
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(serverRule.getChannel());
Single<HelloRequest> req = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp = req.compose(stub::sayHello);
TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
testObserver.assertError(StatusRuntimeException.class);
testObserver.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.CANCELLED.getCode());
}
@Test
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloRequest> req = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp = req.compose(stub::sayHello);
TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
testObserver.assertValue("Hello rxjava");
}
@Override
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage((int) elements));
Single<Message> publisher = request.compose(stub::oneToOne);
return publisher.toFlowable();
}
@Override
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage(TckService.KABOOM));
Single<Message> publisher = request.compose(stub::oneToOne);
return publisher.toFlowable();
}
@Override
public SingleSource<T> apply(Single<T> upstream) {
Single<T> tObservable = upstream
.observeOn(AndroidSchedulers.mainThread());
if (provider == null) {
return tObservable;
}
return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
@Test
public void fourKindsOfRequestAtOnce() throws Exception {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
// == MAKE REQUESTS ==
// One to One
Single<HelloRequest> req1 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp1 = req1.compose(stub::sayHello);
// One to Many
Single<HelloRequest> req2 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Flowable<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);
// Many to One
Flowable<HelloRequest> req3 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Single<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);
// Many to Many
Flowable<HelloRequest> req4 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build(),
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());
Flowable<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);
// == VERIFY RESPONSES ==
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// Run all four verifications in parallel
try {
// One to One
ListenableFuture<Boolean> oneToOne = executorService.submit(() -> {
TestObserver<String> testObserver1 = resp1.map(HelloResponse::getMessage).test();
testObserver1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver1.assertValue("Hello rxjava");
return true;
});
// One to Many
ListenableFuture<Boolean> oneToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber1 = resp2.map(HelloResponse::getMessage).test();
testSubscriber1.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber1.assertValues("Hello rxjava", "Hi rxjava", "Greetings rxjava");
return true;
});
// Many to One
ListenableFuture<Boolean> manyToOne = executorService.submit(() -> {
TestObserver<String> testObserver2 = resp3.map(HelloResponse::getMessage).test();
testObserver2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testObserver2.assertValue("Hello a and b and c");
return true;
});
// Many to Many
ListenableFuture<Boolean> manyToMany = executorService.submit(() -> {
TestSubscriber<String> testSubscriber2 = resp4.map(HelloResponse::getMessage).test();
testSubscriber2.awaitTerminalEvent(1, TimeUnit.SECONDS);
testSubscriber2.assertValues("Hello a and b", "Hello c and d", "Hello e");
testSubscriber2.assertComplete();
return true;
});
@SuppressWarnings("unchecked")
ListenableFuture<List<Boolean>> allFutures = Futures.allAsList(Lists.newArrayList(oneToOne, oneToMany, manyToOne, manyToMany));
// Block for response
List<Boolean> results = allFutures.get(3, TimeUnit.SECONDS);
assertThat(results).containsExactly(true, true, true, true);
} finally {
executorService.shutdown();
}
}