下面列出了io.reactivex.subscribers.TestSubscriber#awaitTerminalEvent ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* https://github.com/salesforce/reactive-grpc/issues/120
*/
@Test
public void chunkOperatorWorksWithConcatMap() {
int chunkSize = DEFAULT_CHUNK_SIZE;
AbstractStreamObserverAndPublisher<Long> source =
new AbstractStreamObserverAndPublisher<Long>(new ConcurrentLinkedQueue<Long>(), null){};
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, 24);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.concatMap(new Function<Long, Publisher<Long>>() {
@Override
public Publisher<Long> apply(Long item) throws Exception {
return Flowable.just(item).delay(3, TimeUnit.MILLISECONDS);
}
})
.test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoErrors();
assertThat(observer.requestsQueue).containsExactly(chunkSize);
}
@Test
public void oneToManyRetryWhen() {
TestSubscriber<Integer> test = newThreeErrorSingle()
.as(GrpcRetry.OneToMany.retryWhen(new Function<Single<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Single<Integer> single) {
return single.toFlowable();
}
}, RetryWhen.maxRetries(3).build()))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void updateContentValuesExecuteAsBlocking() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());
TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");
PutResult updateResult = storIOContentResolver
.put()
.contentValues(testItem.toContentValues())
.withPutResolver(testItemContentValuesPutResolver)
.prepare()
.executeAsBlocking();
assertThat(updateResult.wasUpdated()).isTrue();
Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void getNonExistedObjectExecuteAsBlocking() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
TestItem testItem = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("some value")
.build())
.prepare()
.executeAsBlocking();
assertThat(testItem).isNull();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test public void currentQueue() {
TestSubscriber<Pair<List<Track>, Integer>> test = queueManager.queue().take(1).test();
test.awaitTerminalEvent();
List<Track> actualQueue = test.values().get(0).first;
int actualPosition = test.values().get(0).second;
assertThat(actualQueue, IsIterableContainingInOrder.contains(
queue.get(0), queue.get(1), queue.get(2), queue.get(3), queue.get(4)));
assertThat(actualPosition, is(0));
}
@Test
public void getExistedObjectExecuteAsFlowable() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(1);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emmitedItems = testSubscriber.values();
assertThat(emmitedItems.size()).isEqualTo(1);
assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingDbAsFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
when(storIOSQLite.delete()).thenReturn(new PreparedDelete.Builder(storIOSQLite));
final TestSubscriber<DeleteResult> testSubscriber = new TestSubscriber<DeleteResult>();
storIOSQLite
.delete()
.object(TestItem.newInstance())
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0)).
hasCauseInstanceOf(IllegalStateException.class);
verify(storIOSQLite).delete();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).delete(any(DeleteQuery.class));
verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
@Test
public void deleteObjectsExecuteAsBlocking() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResults<TestItem> deleteResults = storIOContentResolver
.delete()
.objects(singletonList(testItem))
.prepare()
.executeAsBlocking();
assertThat(deleteResults.wasDeleted(testItem)).isTrue();
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void serverCanCancelClientStreamImplicitlyBidi() {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
svc.setExplicitCancel(false);
AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
AtomicBoolean requestDidProduce = new AtomicBoolean(false);
Flowable<NumberProto.Number> request = Flowable
.fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
.delay(10, TimeUnit.MILLISECONDS)
.map(CancellationPropagationIntegrationTest::protoNum)
.doOnNext(x -> {
requestDidProduce.set(true);
System.out.println("Produced: " + x.getNumber(0));
})
.doOnCancel(() -> {
requestWasCanceled.set(true);
System.out.println("Client canceled");
});
TestSubscriber<NumberProto.Number> observer = request
.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
observer.assertTerminated();
assertThat(requestWasCanceled.get()).isTrue();
assertThat(requestDidProduce.get()).isTrue();
errorRule.verifyNoError();
}
@Test
public void statusRuntimeExceptionTriggersHandlerFuseable() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
});
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.observeOn(Schedulers.trampoline())
.test();
StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isTrue();
}
@Test
public void deleteObjectsAsRxFlowable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResults<TestItem> deleteResults = storIOContentResolver
.delete()
.objects(singletonList(testItem))
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.blockingFirst();
assertThat(deleteResults.wasDeleted(testItem)).isTrue();
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingContentProviderAsFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);
when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);
when(storIOContentResolver.delete()).thenReturn(new PreparedDelete.Builder(storIOContentResolver));
final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());
final TestSubscriber<DeleteResults<TestItem>> testSubscriber = new TestSubscriber<DeleteResults<TestItem>>();
storIOContentResolver
.delete()
.objects(items)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOContentResolver).delete();
verify(storIOContentResolver).lowLevel();
verify(storIOContentResolver).interceptors();
verify(storIOContentResolver).defaultRxScheduler();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).delete(any(DeleteQuery.class));
verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
@Test
public void getExistedObjectExecuteAsSingle() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
Single<Optional<TestItem>> testItemSingle = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxSingle();
TestObserver<Optional<TestItem>> testObserver = new TestObserver<Optional<TestItem>>();
testItemSingle.subscribe(testObserver);
testObserver.awaitTerminalEvent(5, SECONDS);
testObserver.assertNoErrors();
List<Optional<TestItem>> emmitedItems = testObserver.values();
assertThat(emmitedItems.size()).isEqualTo(1);
assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithTransactionWithoutAffectingDbAsFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
when(storIOSQLite.put()).thenReturn(new PreparedPut.Builder(storIOSQLite));
final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());
final TestSubscriber<PutResults<TestItem>> testSubscriber = new TestSubscriber<PutResults<TestItem>>();
storIOSQLite
.put()
.objects(items)
.useTransaction(true)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOSQLite).put();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).insert(any(InsertQuery.class), any(ContentValues.class));
verify(lowLevel, never()).update(any(UpdateQuery.class), any(ContentValues.class));
verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
@Test
public void getOneExistedObjectTableUpdate() {
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value3").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(2);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertValue(Optional.<TestItem>empty());
testSubscriber.assertNoErrors();
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emittedItems = testSubscriber.values();
assertThat(emittedItems.size()).isEqualTo(2);
assertThat(emittedItems.get(0).isPresent()).isFalse();
assertThat(expectedItem.equalsWithoutId(emittedItems.get(1).get())).isTrue();
}
@Test
public void getListOfObjectsExecuteAsBlocking() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
List<TestItem> list = storIOContentResolver
.get()
.listOfObjects(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.executeAsBlocking();
assertThat(list).hasSize(1);
assertThat(testItemToInsert.equalsWithoutId(list.get(0))).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteObjectsAsCompletable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
storIOContentResolver
.delete()
.objects(singletonList(testItem))
.prepare()
.asRxCompletable()
.blockingAwait(15, SECONDS);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAccessingDbWithQueryAsFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.get()).thenReturn(new PreparedGet.Builder(storIOSQLite));
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
final TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
storIOSQLite
.get()
.object(TestItem.class)
.withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
Throwable error = testSubscriber.errors().get(0);
assertThat(error)
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessage("Error has occurred during Get operation. query = Query{distinct=false, table='test_table', columns=[], where='', whereArgs=[], groupBy='', having='', orderBy='', limit='', observesTags='[test_tag]'}");
assertThat(error.getCause())
.hasMessage("This type does not have type mapping: "
+ "type = " + TestItem.class + "," +
"db was not touched by this operation, please add type mapping for this type");
verify(storIOSQLite).get();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).query(any(Query.class));
verify(storIOSQLite).observeChanges(LATEST);
verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
@Test
public void verifyBehaviorInCaseOfExceptionWithoutTransactionFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
//noinspection unchecked
final PutResolver<ContentValues> putResolver = mock(PutResolver.class);
final List<ContentValues> contentValues = singletonList(mock(ContentValues.class));
when(putResolver.performPut(same(storIOSQLite), any(ContentValues.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<PutResults<ContentValues>> testSubscriber = new TestSubscriber<PutResults<ContentValues>>();
new PreparedPutContentValuesIterable.Builder(storIOSQLite, contentValues)
.withPutResolver(putResolver)
.useTransaction(false)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
//noinspection ThrowableResultOfMethodCallIgnored
StorIOException expected = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) expected.getCause();
assertThat(cause).hasMessage("test exception");
// Main check of this test
verify(lowLevel, never()).endTransaction();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(putResolver).performPut(same(storIOSQLite), any(ContentValues.class));
verifyNoMoreInteractions(storIOSQLite, lowLevel, putResolver);
}
@Test
public void cursorMustBeClosedInCaseOfExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Object> getResolver = mock(GetResolver.class);
final Cursor cursor = mock(Cursor.class);
when(cursor.getCount()).thenReturn(10);
when(cursor.moveToNext()).thenReturn(true);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenReturn(cursor);
when(getResolver.mapFromCursor(storIOSQLite, cursor))
.thenThrow(new IllegalStateException("test exception"));
PreparedGetObject<Object> preparedGetObject =
new PreparedGetObject<Object>(
storIOSQLite,
Object.class,
Query.builder().table("test_table").observesTags("test_tag").build(),
getResolver
);
final TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
preparedGetObject
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
// Cursor must be closed in case of exception
verify(cursor).close();
verify(storIOSQLite).observeChanges(LATEST);
verify(getResolver).performGet(eq(storIOSQLite), any(Query.class));
verify(getResolver).mapFromCursor(storIOSQLite, cursor);
verify(cursor).getCount();
verify(cursor).moveToNext();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verifyNoMoreInteractions(storIOSQLite, getResolver, cursor);
}