下面列出了io.reactivex.subscribers.TestSubscriber#assertError ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFails() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@Test
public void serverErrorSignalsUpstreamCancellationBidi() {
serverRule.getServiceRegistry().addService(new ExplodeAfterFiveService());
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
AtomicBoolean upstreamCancel = new AtomicBoolean(false);
TestSubscriber<NumberProto.Number> subscriber = Flowable.range(0, Integer.MAX_VALUE)
.map(this::protoNum)
.doOnCancel(() -> upstreamCancel.set(true))
.compose(stub::twoWayPressure)
.doOnNext(i -> System.out.println(i.getNumber(0)))
.test();
subscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
subscriber.assertError(StatusRuntimeException.class);
assertThat(upstreamCancel.get()).isTrue();
}
@Test
public void shouldCallOnError() {
Throwable throwable = new IllegalStateException("Test exception");
//noinspection unchecked
PreparedOperation<String, Optional<String>, String> preparedOperation = mock(PreparedOperation.class);
when(preparedOperation.executeAsBlocking()).thenThrow(throwable);
TestSubscriber<Optional<String>> testSubscriber = TestSubscriber.create();
Flowable
.create(new FlowableOnSubscribeExecuteAsBlockingOptional<String, String>(preparedOperation), MISSING)
.subscribe(testSubscriber);
testSubscriber.assertError(throwable);
testSubscriber.assertNoValues();
testSubscriber.assertNotComplete();
}
@Test
public void shouldCallOnError() {
Throwable throwable = new IllegalStateException("Test exception");
//noinspection unchecked
PreparedOperation<String, String, String> preparedOperation = mock(PreparedOperation.class);
when(preparedOperation.executeAsBlocking()).thenThrow(throwable);
TestSubscriber<String> testSubscriber = TestSubscriber.create();
Flowable
.create(new FlowableOnSubscribeExecuteAsBlocking<String, String, String>(preparedOperation), BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.assertError(throwable);
testSubscriber.assertNoValues();
testSubscriber.assertNotComplete();
}
@Test
public void testInvalidRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(0);
test.assertError(IllegalArgumentException.class);
test.assertTerminated();
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
Uri testUri = mock(Uri.class);
when(storIOContentResolver.observeChangesOfUri(eq(testUri), eq(BackpressureStrategy.MISSING)))
.thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Integer> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOContentResolver), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
new PreparedGetNumberOfResults.Builder(storIOContentResolver)
.withQuery(Query.builder().uri(testUri).build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
assertThat(testSubscriber.errors()).hasSize(1);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionFlowable() {
final Stub stub = Stub.newInstanceApplyNotEmptyAffectedTablesAndTags();
IllegalStateException testException = new IllegalStateException("test exception");
doThrow(testException).when(stub.lowLevel).executeSQL(stub.rawQuery);
final TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
stub.storIOSQLite
.executeSQL()
.withQuery(stub.rawQuery)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
//noinspection ThrowableResultOfMethodCallIgnored
StorIOException expected = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) expected.getCause();
assertThat(cause).hasMessage("test exception");
verify(stub.storIOSQLite).executeSQL();
verify(stub.storIOSQLite).defaultRxScheduler();
verify(stub.storIOSQLite).lowLevel();
verify(stub.lowLevel).executeSQL(stub.rawQuery);
verify(stub.storIOSQLite).interceptors();
verifyNoMoreInteractions(stub.storIOSQLite, stub.lowLevel);
}
@Test
public void getAllImpressions_readError_notifiesError() {
when(storageClient.read(any(CampaignImpressionsParser.class)))
.thenReturn(Maybe.error(new IOException()));
TestSubscriber<CampaignImpressionList> subscriber =
impressionStorageClient.getAllImpressions().toFlowable().test();
subscriber.assertError(IOException.class);
}
@Test
public void otherError() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestSubscriber<Object> subscriber = Flowable.error(new RuntimeException())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertError(RuntimeException.class);
then(timeLimiter).should()
.onError(any(RuntimeException.class));
}
@Test
public void testSubscriberWithException() {
Flowable<Integer> flowable = Flowable.create(source -> {
source.onNext(1);
source.onError(new RuntimeException());
}, BackpressureStrategy.LATEST);
TestSubscriber<Integer> ts = flowable.test();
ts.assertSubscribed();
ts.assertError(RuntimeException.class);
}
@Test
public void createWithError() {
Exception failure = new Exception("Boom");
PublisherStage<Object> boom = factory.create(null, () -> failure);
TestSubscriber<Object> test = boom.get().test();
test.assertError(failure);
}
@Test
public void oneToMany() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloResponse> resp = Single.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloRespStream);
TestSubscriber<HelloResponse> test = resp
.doOnNext(System.out::println)
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void manyToMany() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);
TestSubscriber<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void testGetHelloWorldAsyncObservable404() throws Exception {
String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync404";
Invocation.Builder b = ClientBuilder.newClient().register(new FlowableRxInvokerProvider())
.target(address).request();
final TestSubscriber<String> subscriber = new TestSubscriber<>();
b.rx(FlowableRxInvoker.class)
.get(String.class)
.subscribe(subscriber);
subscriber.await(1, TimeUnit.SECONDS);
subscriber.assertError(NotFoundException.class);
}
@Test
public void statusRuntimeExceptionTriggersHandlerFuseable() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
});
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.observeOn(Schedulers.trampoline())
.test();
StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isTrue();
}
@Test
public void testErrorDeferred() {
Single<Integer> s = Single.fromCallable(() -> {
throw new RuntimeException("boo");
});
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
.test();
ts.assertError(RuntimeException.class);
ts.assertNoValues();
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Cursor> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
new PreparedGetCursor.Builder(storIOSQLite)
.withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable.toFlowable(BackpressureStrategy.MISSING).observeOn(Schedulers.computation()).test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
@Test
public void shouldFinishTransactionIfExceptionHasOccurredFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
//noinspection unchecked
final PutResolver<Object> putResolver = mock(PutResolver.class);
when(putResolver.performPut(same(storIOSQLite), any()))
.thenThrow(new IllegalStateException("test exception"));
final List<Object> objects = singletonList(new Object());
final TestSubscriber<PutResults<Object>> testSubscriber = new TestSubscriber<PutResults<Object>>();
new PreparedPutCollectionOfObjects.Builder<Object>(storIOSQLite, objects)
.useTransaction(true)
.withPutResolver(putResolver)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
//noinspection ThrowableResultOfMethodCallIgnored
StorIOException expected = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) expected.getCause();
assertThat(cause).hasMessage("test exception");
verify(lowLevel).beginTransaction();
verify(lowLevel, never()).setTransactionSuccessful();
verify(lowLevel).endTransaction();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(putResolver).performPut(same(storIOSQLite), any());
verifyNoMoreInteractions(storIOSQLite, lowLevel, putResolver);
}
@Test
public void cursorMustBeClosedInCaseOfExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Object> getResolver = mock(GetResolver.class);
final Cursor cursor = mock(Cursor.class);
when(cursor.getCount()).thenReturn(10);
when(cursor.moveToNext()).thenReturn(true);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenReturn(cursor);
when(getResolver.mapFromCursor(storIOSQLite, cursor))
.thenThrow(new IllegalStateException("test exception"));
PreparedGetObject<Object> preparedGetObject =
new PreparedGetObject<Object>(
storIOSQLite,
Object.class,
Query.builder().table("test_table").observesTags("test_tag").build(),
getResolver
);
final TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
preparedGetObject
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
// Cursor must be closed in case of exception
verify(cursor).close();
verify(storIOSQLite).observeChanges(LATEST);
verify(getResolver).performGet(eq(storIOSQLite), any(Query.class));
verify(getResolver).mapFromCursor(storIOSQLite, cursor);
verify(cursor).getCount();
verify(cursor).moveToNext();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verifyNoMoreInteractions(storIOSQLite, getResolver, cursor);
}