io.reactivex.subscribers.TestSubscriber#assertTerminated ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#assertTerminated ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
    TestService svc = new TestService();
    serverRule.getServiceRegistry().addService(svc);

    RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
    TestSubscriber<NumberProto.Number> subscription =  Single.just(Empty.getDefaultInstance())
            .as(stub::responsePressure)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"))
            .take(10)
            .test();

    // Consume some work
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    subscription.dispose();

    subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
    subscription.assertValueCount(10);
    subscription.assertTerminated();
    assertThat(svc.wasCanceled()).isTrue();

    errorRule.verifyNoError();
}
 
源代码2 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testRetryWhenMultipleRetriesWorkOnSingleDelay() {
    AtomicInteger count = new AtomicInteger();
    TestSubscriber<Object> ts = TestSubscriber.create();
    Exception exception = new Exception("boo");
    Flowable.error(exception) //
            .doOnSubscribe(Consumers.increment(count)) //
            .retryWhen(RetryWhen //
                    .delay(1, TimeUnit.MILLISECONDS) //
                    .scheduler(Schedulers.trampoline()) //
                    .maxRetries(10).build()) //
            .subscribe(ts);
    ts.assertTerminated();
    assertFalse(ts.errors().isEmpty());
    assertEquals(exception, ts.errors().get(0));
    assertEquals(11, count.get());
}
 
源代码3 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testInvalidRequest() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(0);
    test.assertError(IllegalArgumentException.class);
    test.assertTerminated();
}
 
@Test
public void testTestSubscriber() {

    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    //In order to emit "1", "2", "3"
    Flowable.just("1", "2", "3").subscribe(testSubscriber);
    //Assert whether values are equal
    testSubscriber.assertValues("1", "2", "3");
    //Assert value does not exist
    testSubscriber.assertNever("4");
    //Is the number of asserted values equal?
    testSubscriber.assertValueCount(3);
    //Assertion terminated
    testSubscriber.assertTerminated();
}
 
@Test
public void serverCanCancelClientStreamImplicitlyBidi() {
    TestService svc = new TestService();
    serverRule.getServiceRegistry().addService(svc);

    RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());

    svc.setExplicitCancel(false);

    AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
    AtomicBoolean requestDidProduce = new AtomicBoolean(false);

    Flowable<NumberProto.Number> request = Flowable
            .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
            .delay(10, TimeUnit.MILLISECONDS)
            .map(CancellationPropagationIntegrationTest::protoNum)
            .doOnNext(x -> {
                requestDidProduce.set(true);
                System.out.println("Produced: " + x.getNumber(0));
            })
            .doOnCancel(() -> {
                requestWasCanceled.set(true);
                System.out.println("Client canceled");
            });

    TestSubscriber<NumberProto.Number> observer = request
            .compose(stub::twoWayPressure)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .test();

    observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
    observer.assertTerminated();
    assertThat(requestWasCanceled.get()).isTrue();
    assertThat(requestDidProduce.get()).isTrue();

    errorRule.verifyNoError();
}
 
@Test
public void serverCanCancelClientStreamExplicitlyBidi() {
    TestService svc = new TestService();
    serverRule.getServiceRegistry().addService(svc);

    RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());

    svc.setExplicitCancel(true);

    AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
    AtomicBoolean requestDidProduce = new AtomicBoolean(false);

    Flowable<NumberProto.Number> request = Flowable
            .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
            .delay(10, TimeUnit.MILLISECONDS)
            .map(CancellationPropagationIntegrationTest::protoNum)
            .doOnNext(n -> {
                requestDidProduce.set(true);
                System.out.println("P: " + n.getNumber(0));
            })
            .doOnCancel(() -> {
                requestWasCanceled.set(true);
                System.out.println("Client canceled");
            });

    TestSubscriber<NumberProto.Number> observer = request
            .compose(stub::twoWayPressure)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .test();

    observer.awaitTerminalEvent();
    observer.assertTerminated();
    assertThat(requestWasCanceled.get()).isTrue();
    assertThat(requestDidProduce.get()).isTrue();

    errorRule.verifyNoError();
}
 
@Test
public void testTestSubscriber() {

    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    //In order to emit "1", "2", "3"
    Flowable.just("1", "2", "3").subscribe(testSubscriber);
    //Assert whether values are equal
    testSubscriber.assertValues("1", "2", "3");
    //Assert value does not exist
    testSubscriber.assertNever("4");
    //Is the number of asserted values equal?
    testSubscriber.assertValueCount(3);
    //Assertion terminated
    testSubscriber.assertTerminated();
}
 
@Test
public void testInsertDuplicateId() {
    final Mono<Person> insertMono = cosmosTemplate.insert(TEST_PERSON,
        new PartitionKey(personInfo.getPartitionKeyFieldValue(TEST_PERSON)));
    final TestSubscriber testSubscriber = new TestSubscriber();
    insertMono.subscribe(testSubscriber);
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNotComplete();
    testSubscriber.assertTerminated();
    assertThat(testSubscriber.errors()).hasSize(1);
    assertThat(((List) testSubscriber.getEvents().get(1)).get(0))
        .isInstanceOf(CosmosDBAccessException.class);
}