下面列出了io.reactivex.subscribers.TestSubscriber#values ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test_just_Flowable() {
Flowable<String> observable = Flowable.just("mango", "papaya", "guava");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
observable.subscribe(testSubscriber);
List<String> items = testSubscriber.values();
testSubscriber.assertComplete();
testSubscriber.assertSubscribed();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(3);
testSubscriber.assertValues("mango", "papaya", "guava");
}
@Test public void observeQuery() throws Exception {
enqueue("demo/locales.json");
enqueue("demo/types.json");
enqueue("demo/initial.json");
sync();
TestSubscriber<Cat> subscriber = new TestSubscriber<>();
vault.observe(Cat.class)
.where(BaseFields.REMOTE_ID + " IN(?, ?, ?)", "happycat", "nyancat", "garfield")
.limit(2)
.order(Cat$Fields.UPDATED_AT + " DESC")
.all()
.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertComplete();
List<Cat> cats = subscriber.values();
assertThat(cats).hasSize(2);
assertThat(cats.get(0).updatedAt()).isEqualTo("2013-11-18T15:58:02.018Z");
assertThat(cats.get(1).updatedAt()).isEqualTo("2013-09-04T09:19:39.027Z");
}
@Test public void observeSyncResults() throws Exception {
enqueue("demo/locales.json");
enqueue("demo/types.json");
enqueue("demo/initial.json");
TestSubscriber<SyncResult> subscriber = new TestSubscriber<>();
Vault.observeSyncResults().subscribe(subscriber);
subscriber.assertNoValues();
sync();
subscriber.assertNoErrors();
List<SyncResult> events = subscriber.values();
assertThat(events).hasSize(1);
assertThat(events.get(0).isSuccessful()).isTrue();
assertThat(events.get(0).spaceId()).isEqualTo("cfexampleapi");
}
@Test
public void testFindAll() throws TechnicalException {
// create role
Role role1 = new Role();
role1.setName("testName1");
role1.setReferenceType(ReferenceType.DOMAIN);
role1.setReferenceId(DOMAIN_ID);
Role roleCreated1 = roleRepository.create(role1).blockingGet();
Role role2 = new Role();
role2.setName("testName2");
role2.setReferenceType(ReferenceType.DOMAIN);
role2.setReferenceId(DOMAIN_ID);
Role roleCreated2 = roleRepository.create(role2).blockingGet();
// Role 3 is on domain#2.
Role role3 = new Role();
role3.setName("testName3");
role3.setReferenceType(ReferenceType.DOMAIN);
role3.setReferenceId("domain#2");
roleRepository.create(role3).blockingGet();
// fetch role
TestSubscriber<Role> testObserver = roleRepository.findAll(ReferenceType.DOMAIN, DOMAIN_ID).test();
testObserver.awaitTerminalEvent();
testObserver.assertComplete();
testObserver.assertNoErrors();
testObserver.assertValueCount(2);
List<Role> roles = testObserver.values();
assertTrue(roles.stream().anyMatch(role -> role.getId().equals(roleCreated1.getId())));
assertTrue(roles.stream().anyMatch(role -> role.getId().equals(roleCreated2.getId())));
}
@Test public void setNewQueueShouldSetShuffleOff() {
TestSubscriber<Pair<Integer, Integer>> test = queueManager.mode().take(3).test();
queueManager.shuffle();
queueManager.setQueue(new ArrayList<>(queue), 2000);
test.awaitTerminalEvent();
List<Pair<Integer, Integer>> modes = test.values();
assertThat(modes.get(0).first, is(SHUFFLE_OFF));
assertThat(modes.get(1).first, is(SHUFFLE_ALL));
assertThat(modes.get(2).first, is(SHUFFLE_OFF));
}
@Test public void shuffleModes() {
TestSubscriber<Pair<Integer, Integer>> test = queueManager.mode().take(3).test();
queueManager.shuffle();
queueManager.shuffle();
test.awaitTerminalEvent();
List<Pair<Integer, Integer>> modes = test.values();
assertThat(modes.get(0).first, is(SHUFFLE_OFF));
assertThat(modes.get(1).first, is(SHUFFLE_ALL));
assertThat(modes.get(2).first, is(SHUFFLE_OFF));
}
@Test public void repeatModes() {
TestSubscriber<Pair<Integer, Integer>> test = queueManager.mode().take(4).test();
queueManager.repeat();
queueManager.repeat();
queueManager.repeat();
test.awaitTerminalEvent();
List<Pair<Integer, Integer>> modes = test.values();
assertThat(modes.get(0).second, is(REPEAT_OFF));
assertThat(modes.get(1).second, is(REPEAT_ALL));
assertThat(modes.get(2).second, is(REPEAT_ONE));
assertThat(modes.get(3).second, is(REPEAT_OFF));
}
@Test
public void getExistedObjectExecuteAsFlowable() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(1);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emmitedItems = testSubscriber.values();
assertThat(emmitedItems.size()).isEqualTo(1);
assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void getOneExistedObjectTableUpdate() {
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value3").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(2);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertValue(Optional.<TestItem>empty());
testSubscriber.assertNoErrors();
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emittedItems = testSubscriber.values();
assertThat(emittedItems.size()).isEqualTo(2);
assertThat(emittedItems.get(0).isPresent()).isFalse();
assertThat(expectedItem.equalsWithoutId(emittedItems.get(1).get())).isTrue();
}
@Test
@Repeat(times = 20)
public void shouldReceiveOneNotificationWithAllAffectedTablesInTransactionWithMultipleThreads() throws InterruptedException {
final String table1 = "test_table1";
final String table2 = "test_table2";
final int numberOfThreads = ConcurrencyTesting.optimalTestThreadsCount();
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
final StorIOSQLite.LowLevel lowLevel = storIOSQLite.lowLevel();
lowLevel.beginTransaction();
final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// All threads should start "simultaneously".
startAllThreadsLock.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lowLevel.notifyAboutChanges(Changes.newInstance(table1));
lowLevel.notifyAboutChanges(Changes.newInstance(table2));
allThreadsFinishedLock.countDown();
}
}).start();
}
// Ready!
// Steady!
startAllThreadsLock.countDown(); // Go!
assertThat(allThreadsFinishedLock.await(25, SECONDS)).isTrue();
// While we in transaction, no changes should be sent.
testSubscriber.assertValueCount(0);
lowLevel.endTransaction();
testSubscriber.assertNoErrors();
List<Changes> actualChanges = testSubscriber.values();
assertThat(actualChanges).hasSize(1);
assertThat(actualChanges.get(0).affectedTables()).containsOnly("test_table1", "test_table2");
}
@Test
public void getCursorAsFlowableOnlyInitialValue() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
final TestSubscriber<Cursor> cursorTestSubscriber = new TestSubscriber<Cursor>();
storIOContentResolver
.get()
.cursor()
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(1)
.subscribe(cursorTestSubscriber);
cursorTestSubscriber.awaitTerminalEvent(60, SECONDS);
cursorTestSubscriber.assertNoErrors();
List<Cursor> listOfCursors = cursorTestSubscriber.values();
assertThat(listOfCursors).hasSize(1);
Assertions.assertThat(listOfCursors.get(0)).hasCount(1);
listOfCursors.get(0).moveToFirst();
assertThat(testItemToInsert.equalsWithoutId(TestItem.fromCursor(listOfCursors.get(0))))
.isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI));
}