下面列出了io.reactivex.subscribers.TestSubscriber#assertTerminated ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.take(10)
.test();
// Consume some work
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
subscription.dispose();
subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
subscription.assertValueCount(10);
subscription.assertTerminated();
assertThat(svc.wasCanceled()).isTrue();
errorRule.verifyNoError();
}
@Test
public void testRetryWhenMultipleRetriesWorkOnSingleDelay() {
AtomicInteger count = new AtomicInteger();
TestSubscriber<Object> ts = TestSubscriber.create();
Exception exception = new Exception("boo");
Flowable.error(exception) //
.doOnSubscribe(Consumers.increment(count)) //
.retryWhen(RetryWhen //
.delay(1, TimeUnit.MILLISECONDS) //
.scheduler(Schedulers.trampoline()) //
.maxRetries(10).build()) //
.subscribe(ts);
ts.assertTerminated();
assertFalse(ts.errors().isEmpty());
assertEquals(exception, ts.errors().get(0));
assertEquals(11, count.get());
}
@Test
public void testInvalidRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(0);
test.assertError(IllegalArgumentException.class);
test.assertTerminated();
}
@Test
public void testTestSubscriber() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//In order to emit "1", "2", "3"
Flowable.just("1", "2", "3").subscribe(testSubscriber);
//Assert whether values are equal
testSubscriber.assertValues("1", "2", "3");
//Assert value does not exist
testSubscriber.assertNever("4");
//Is the number of asserted values equal?
testSubscriber.assertValueCount(3);
//Assertion terminated
testSubscriber.assertTerminated();
}
@Test
public void serverCanCancelClientStreamImplicitlyBidi() {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
svc.setExplicitCancel(false);
AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
AtomicBoolean requestDidProduce = new AtomicBoolean(false);
Flowable<NumberProto.Number> request = Flowable
.fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
.delay(10, TimeUnit.MILLISECONDS)
.map(CancellationPropagationIntegrationTest::protoNum)
.doOnNext(x -> {
requestDidProduce.set(true);
System.out.println("Produced: " + x.getNumber(0));
})
.doOnCancel(() -> {
requestWasCanceled.set(true);
System.out.println("Client canceled");
});
TestSubscriber<NumberProto.Number> observer = request
.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
observer.assertTerminated();
assertThat(requestWasCanceled.get()).isTrue();
assertThat(requestDidProduce.get()).isTrue();
errorRule.verifyNoError();
}
@Test
public void serverCanCancelClientStreamExplicitlyBidi() {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
svc.setExplicitCancel(true);
AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
AtomicBoolean requestDidProduce = new AtomicBoolean(false);
Flowable<NumberProto.Number> request = Flowable
.fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
.delay(10, TimeUnit.MILLISECONDS)
.map(CancellationPropagationIntegrationTest::protoNum)
.doOnNext(n -> {
requestDidProduce.set(true);
System.out.println("P: " + n.getNumber(0));
})
.doOnCancel(() -> {
requestWasCanceled.set(true);
System.out.println("Client canceled");
});
TestSubscriber<NumberProto.Number> observer = request
.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
observer.awaitTerminalEvent();
observer.assertTerminated();
assertThat(requestWasCanceled.get()).isTrue();
assertThat(requestDidProduce.get()).isTrue();
errorRule.verifyNoError();
}
@Test
public void testTestSubscriber() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//In order to emit "1", "2", "3"
Flowable.just("1", "2", "3").subscribe(testSubscriber);
//Assert whether values are equal
testSubscriber.assertValues("1", "2", "3");
//Assert value does not exist
testSubscriber.assertNever("4");
//Is the number of asserted values equal?
testSubscriber.assertValueCount(3);
//Assertion terminated
testSubscriber.assertTerminated();
}
@Test
public void testInsertDuplicateId() {
final Mono<Person> insertMono = cosmosTemplate.insert(TEST_PERSON,
new PartitionKey(personInfo.getPartitionKeyFieldValue(TEST_PERSON)));
final TestSubscriber testSubscriber = new TestSubscriber();
insertMono.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNotComplete();
testSubscriber.assertTerminated();
assertThat(testSubscriber.errors()).hasSize(1);
assertThat(((List) testSubscriber.getEvents().get(1)).get(0))
.isInstanceOf(CosmosDBAccessException.class);
}