下面列出了io.reactivex.subscribers.TestSubscriber#assertNoValues ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void observeChangesAndNotifyAboutChangesShouldWorkCorrectly() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("test_table", "tag");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValue(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void observeChangesInTables_shouldReceiveIfTableWasChanged() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Set<String> tables = new HashSet<String>(2);
tables.add("table1");
tables.add("table2");
storIOSQLite
.observeChangesInTables(tables, LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("table2");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValues(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void observeChangesInTables_shouldNotReceiveIfTableWasNotChanged() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Set<String> tables = new HashSet<String>(2);
tables.add("table1");
tables.add("table2");
storIOSQLite
.observeChangesInTables(tables, LATEST)
.subscribe(testSubscriber);
storIOSQLite
.lowLevel()
.notifyAboutChanges(Changes.newInstance("table3"));
testSubscriber.assertNoValues();
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void get_whenInMemCacheExpired_isEmpty() {
when(storageClient.write(expiredCampaignResponse)).thenReturn(fakeWrite);
when(storageClient.read(FetchEligibleCampaignsResponse.parser())).thenReturn(fakeRead);
campaignCacheClient.put(expiredCampaignResponse).subscribe();
TestSubscriber<FetchEligibleCampaignsResponse> subscriber =
campaignCacheClient.get().toFlowable().test();
subscriber.assertNoValues();
}
@Test
public void get_whenStorageCacheExpired_isEmpty() {
when(storageClient.read(FetchEligibleCampaignsResponse.parser()))
.thenReturn(Maybe.just(expiredCampaignResponse));
TestSubscriber<FetchEligibleCampaignsResponse> subscriber =
campaignCacheClient.get().toFlowable().test();
subscriber.assertNoValues();
}
@Test
public void get_whenBothCachesAreEmpty_isEmpty() {
when(storageClient.read(FetchEligibleCampaignsResponse.parser()))
.thenReturn(Maybe.error(new FileNotFoundException()));
TestSubscriber<FetchEligibleCampaignsResponse> subscriber =
campaignCacheClient.get().toFlowable().test();
subscriber.assertNoValues();
}
@Test
public void testSplitSimpleNormalCancelledAtBeginning() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(0) //
.assertNoValues() //
.assertNotTerminated();
ts.cancel();
ts.requestMore(1);
ts.assertNoValues();
ts.assertNotTerminated();
}
@Test
public void testAcceptSocketRejectsAlways() throws UnknownHostException, IOException, InterruptedException {
reset();
TestSubscriber<Object> ts = TestSubscriber.create();
try {
int bufferSize = 4;
AtomicInteger port = new AtomicInteger();
IO.serverSocketAutoAllocatePort(Consumers.set(port)) //
.readTimeoutMs(10000) //
.acceptTimeoutMs(200) //
.bufferSize(bufferSize) //
.acceptSocketIf(Functions.alwaysFalse()) //
.create() //
.subscribeOn(scheduler) //
.subscribe(ts);
Thread.sleep(300);
Socket socket = new Socket(LOCALHOST, port.get());
OutputStream out = socket.getOutputStream();
out.write("12345678901234567890".getBytes());
out.close();
socket.close();
Thread.sleep(1000);
ts.assertNoValues();
} finally {
// will close server socket
ts.dispose();
}
}
@Test
public void testUnsubscribeAfterActionButBeforeCompletionDoesNotAffectCompletion() {
final TestSubscriber<Object> ts = TestSubscriber.create();
Flowable.empty() //
.compose(Transformers.doOnEmpty(new Action() {
@Override
public void run() {
ts.cancel();
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertComplete();
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutTransactionWithoutAffectingDbAsFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
when(storIOSQLite.put()).thenReturn(new PreparedPut.Builder(storIOSQLite));
final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());
final TestSubscriber<PutResults<TestItem>> testSubscriber = new TestSubscriber<PutResults<TestItem>>();
storIOSQLite
.put()
.objects(items)
.useTransaction(false)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOSQLite).put();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).insert(any(InsertQuery.class), any(ContentValues.class));
verify(lowLevel, never()).update(any(UpdateQuery.class), any(ContentValues.class));
verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingContentProviderAsFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);
when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);
when(storIOContentResolver.put()).thenReturn(new PreparedPut.Builder(storIOContentResolver));
final TestSubscriber<PutResult> testSubscriber = new TestSubscriber<PutResult>();
storIOContentResolver
.put()
.object(TestItem.newInstance())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOContentResolver).put();
verify(storIOContentResolver).lowLevel();
verify(storIOContentResolver).interceptors();
verify(storIOContentResolver).defaultRxScheduler();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).insert(any(InsertQuery.class), any(ContentValues.class));
verify(lowLevel, never()).update(any(UpdateQuery.class), any(ContentValues.class));
verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingContentProviderAsFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);
when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);
when(storIOContentResolver.delete()).thenReturn(new PreparedDelete.Builder(storIOContentResolver));
final TestSubscriber<DeleteResult> testSubscriber = new TestSubscriber<DeleteResult>();
storIOContentResolver
.delete()
.object(TestItem.newInstance())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessage("Error has occurred during Delete operation. object = TestItem{data='null'}");
verify(storIOContentResolver).delete();
verify(storIOContentResolver).lowLevel();
verify(storIOContentResolver).interceptors();
verify(storIOContentResolver).defaultRxScheduler();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).delete(any(DeleteQuery.class));
verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAccessingContentProviderAsFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);
when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);
when(storIOContentResolver.get()).thenReturn(new PreparedGet.Builder(storIOContentResolver));
when(storIOContentResolver.observeChangesOfUri(any(Uri.class), eq(BackpressureStrategy.MISSING)))
.thenReturn(Flowable.<Changes>empty());
final TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder().uri(mock(Uri.class)).build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOContentResolver).get();
verify(storIOContentResolver).lowLevel();
verify(storIOContentResolver).interceptors();
verify(storIOContentResolver).defaultRxScheduler();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).query(any(Query.class));
verify(storIOContentResolver).observeChangesOfUri(any(Uri.class), eq(BackpressureStrategy.MISSING));
verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionFlowable() {
final PutContentValuesStub stub = PutContentValuesStub.newPutStubForOneContentValues();
ContentValues contentValues = stub.contentValues.get(0);
IllegalStateException testException = new IllegalStateException("test exception");
doThrow(testException).when(stub.putResolver).performPut(stub.storIOSQLite, contentValues);
final TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
stub.storIOSQLite
.put()
.contentValues(contentValues)
.withPutResolver(stub.putResolver)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
//noinspection ThrowableResultOfMethodCallIgnored
StorIOException expected = (StorIOException) testSubscriber.errors().get(0);
assertThat(expected).hasMessageStartingWith("Error has occurred during Put operation. contentValues =");
IllegalStateException cause = (IllegalStateException) expected.getCause();
assertThat(cause).hasMessage("test exception");
verify(stub.storIOSQLite).put();
verify(stub.storIOSQLite).defaultRxScheduler();
verify(stub.storIOSQLite).interceptors();
verifyNoMoreInteractions(stub.storIOSQLite, stub.lowLevel);
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingContentProviderAsFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);
when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);
when(storIOContentResolver.put()).thenReturn(new PreparedPut.Builder(storIOContentResolver));
final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());
final TestSubscriber<PutResults<TestItem>> testSubscriber = new TestSubscriber<PutResults<TestItem>>();
storIOContentResolver
.put()
.objects(items)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
assertThat(testSubscriber.errors().get(0))
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class);
verify(storIOContentResolver).put();
verify(storIOContentResolver).lowLevel();
verify(storIOContentResolver).interceptors();
verify(storIOContentResolver).defaultRxScheduler();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).insert(any(InsertQuery.class), any(ContentValues.class));
verify(lowLevel, never()).update(any(UpdateQuery.class), any(ContentValues.class));
verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAccessingDbWithRawQueryAsFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.get()).thenReturn(new PreparedGet.Builder(storIOSQLite));
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
final TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
storIOSQLite
.get()
.object(TestItem.class)
.withQuery(RawQuery.builder().query("test query").build())
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
Throwable error = testSubscriber.errors().get(0);
assertThat(error)
.isInstanceOf(StorIOException.class)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessage("Error has occurred during Get operation. query = RawQuery{query='test query', args=[], affectsTables=[], affectsTags=[], observesTables=[], observesTags=[]}");
assertThat(error.getCause())
.hasMessage("This type does not have type mapping: "
+ "type = " + TestItem.class + "," +
"db was not touched by this operation, please add type mapping for this type");
verify(storIOSQLite).get();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).defaultRxScheduler();
verify(storIOSQLite).interceptors();
verify(lowLevel).typeMapping(TestItem.class);
verify(lowLevel, never()).rawQuery(any(RawQuery.class));
verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
@Test
public void observeChangesInTable() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChangesInTable("table1", LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes1 = Changes.newInstance("table2");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes1);
testSubscriber.assertNoValues();
Changes changes2 = Changes.newInstance("table1");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes2);
testSubscriber.assertValue(changes2);
Changes changes3 = Changes.newInstance("table3");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes3);
// Subscriber should not see changes of table2 and table3
testSubscriber.assertValue(changes2);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
private void assertNoNotification(TestSubscriber<InAppMessage> subscriber) {
subscriber.assertNotComplete();
subscriber.assertNoErrors();
subscriber.assertNoValues();
}
@Test
public void shouldFinishTransactionIfExceptionHasOccurredFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);
when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
//noinspection unchecked
final DeleteResolver<Object> deleteResolver = mock(DeleteResolver.class);
when(deleteResolver.performDelete(same(storIOSQLite), any()))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<DeleteResults<Object>> testSubscriber = new TestSubscriber<DeleteResults<Object>>();
new PreparedDeleteCollectionOfObjects.Builder<Object>(storIOSQLite, singletonList(new Object()))
.useTransaction(true)
.withDeleteResolver(deleteResolver)
.prepare()
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoValues();
testSubscriber.assertError(StorIOException.class);
//noinspection ThrowableResultOfMethodCallIgnored
StorIOException expected = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) expected.getCause();
assertThat(cause).hasMessage("test exception");
verify(lowLevel).beginTransaction();
verify(lowLevel, never()).setTransactionSuccessful();
verify(lowLevel).endTransaction();
verify(storIOSQLite).lowLevel();
verify(storIOSQLite).interceptors();
verify(storIOSQLite).defaultRxScheduler();
verify(deleteResolver).performDelete(same(storIOSQLite), any());
verifyNoMoreInteractions(storIOSQLite, lowLevel, deleteResolver);
}
private void check(@NonNull PreparedOperation operation) {
verify(storIOContentResolver).defaultRxScheduler();
TestSubscriber subscriber = new TestSubscriber();
//noinspection unchecked
operation.asRxFlowable(BackpressureStrategy.MISSING).subscribe(subscriber);
subscriber.assertNoValues();
scheduler.triggerActions();
subscriber.assertValueCount(1);
}