io.reactivex.Flowable#as ( )源码实例Demo

下面列出了io.reactivex.Flowable#as ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void zeroMessageResponseManyToOne() {
    serverRule.getServiceRegistry().addService(new MissingUnaryResponseService());

    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(serverRule.getChannel());
    Flowable<HelloRequest> req = Flowable.just(
            HelloRequest.newBuilder().setName("a").build(),
            HelloRequest.newBuilder().setName("b").build(),
            HelloRequest.newBuilder().setName("c").build());

    Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);

    TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
    testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
    testObserver.assertError(StatusRuntimeException.class);
    testObserver.assertError(t -> ((StatusRuntimeException) t).getStatus().getCode() == Status.CANCELLED.getCode());
}
 
@Test
public void manyToOne() {
    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
    Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
    Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);
    TestObserver<HelloResponse> test = resp.test();

    test.awaitTerminalEvent(3, TimeUnit.SECONDS);
    test.assertError(t -> t instanceof StatusRuntimeException);
    // Flowable requests get canceled when unexpected errors happen
    test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
 
源代码3 项目: reactive-grpc   文件: EndToEndIntegrationTest.java
@Test
public void manyToOne() {
    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
    Flowable<HelloRequest> req = Flowable.just(
            HelloRequest.newBuilder().setName("a").build(),
            HelloRequest.newBuilder().setName("b").build(),
            HelloRequest.newBuilder().setName("c").build());

    Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);

    TestObserver<String> testObserver = resp.map(HelloResponse::getMessage).test();
    testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS);
    testObserver.assertValue("Hello a and b and c");
}
 
@Override
public Publisher<Message> createPublisher(long elements) {
    RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
    Flowable<Message> request = Flowable.range(0, (int)elements).map(this::toMessage);
    Single<Message> publisher = request.as(stub::manyToOne);

    return publisher.toFlowable();
}
 
@Override
public Publisher<Message> createFailedPublisher() {
    RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
    Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
    Single<Message> publisher = request.as(stub::manyToOne);

    return publisher.toFlowable();
}
 
@Test
public void fourKindsOfRequestAtOnce() throws Exception {
    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);

    // == MAKE REQUESTS ==
    // One to One
    Single<HelloRequest> req1 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
    Single<HelloResponse> resp1 = req1.compose(stub::sayHello);

    // One to Many
    Single<HelloRequest> req2 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
    Flowable<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);

    // Many to One
    Flowable<HelloRequest> req3 = Flowable.just(
            HelloRequest.newBuilder().setName("a").build(),
            HelloRequest.newBuilder().setName("b").build(),
            HelloRequest.newBuilder().setName("c").build());

    Single<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);

    // Many to Many
    Flowable<HelloRequest> req4 = Flowable.just(
            HelloRequest.newBuilder().setName("a").build(),
            HelloRequest.newBuilder().setName("b").build(),
            HelloRequest.newBuilder().setName("c").build(),
            HelloRequest.newBuilder().setName("d").build(),
            HelloRequest.newBuilder().setName("e").build());

    Flowable<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);

    // == VERIFY RESPONSES ==
    ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    // Run all four verifications in parallel
    try {
        // One to One
        ListenableFuture<Boolean> oneToOne = executorService.submit(() -> {
            TestObserver<String> testObserver1 = resp1.map(HelloResponse::getMessage).test();
            testObserver1.awaitTerminalEvent(1, TimeUnit.SECONDS);
            testObserver1.assertValue("Hello rxjava");
            return true;
        });

        // One to Many
        ListenableFuture<Boolean> oneToMany = executorService.submit(() -> {
            TestSubscriber<String> testSubscriber1 = resp2.map(HelloResponse::getMessage).test();
            testSubscriber1.awaitTerminalEvent(1, TimeUnit.SECONDS);
            testSubscriber1.assertValues("Hello rxjava", "Hi rxjava", "Greetings rxjava");
            return true;
        });

        // Many to One
        ListenableFuture<Boolean> manyToOne = executorService.submit(() -> {
            TestObserver<String> testObserver2 = resp3.map(HelloResponse::getMessage).test();
            testObserver2.awaitTerminalEvent(1, TimeUnit.SECONDS);
            testObserver2.assertValue("Hello a and b and c");
            return true;
        });

        // Many to Many
        ListenableFuture<Boolean> manyToMany = executorService.submit(() -> {
            TestSubscriber<String> testSubscriber2 = resp4.map(HelloResponse::getMessage).test();
            testSubscriber2.awaitTerminalEvent(1, TimeUnit.SECONDS);
            testSubscriber2.assertValues("Hello a and b", "Hello c and d", "Hello e");
            testSubscriber2.assertComplete();
            return true;
        });

        @SuppressWarnings("unchecked")
        ListenableFuture<List<Boolean>> allFutures = Futures.allAsList(Lists.newArrayList(oneToOne, oneToMany, manyToOne, manyToMany));
        // Block for response
        List<Boolean> results = allFutures.get(3, TimeUnit.SECONDS);
        assertThat(results).containsExactly(true, true, true, true);

    } finally {
        executorService.shutdown();
    }
}