下面列出了io.reactivex.subscribers.TestSubscriber#assertNoErrors ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<ORSet.ORSetCommand<String>, ORSet.ORSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final ORSet.AddCommand<String> command1 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
final ORSet.AddCommand<String> command2 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("2", UUID.randomUUID()));
final ORSet.AddCommand<String> command3 = new ORSet.AddCommand<>(set.getCrdtId(), new ORSet.Element<>("1", UUID.randomUUID()));
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(3));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2"),
new AddCommandMatcher<>(set.getCrdtId(), "1")
));
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForRemoves() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribe(subscriber);
set.add("1");
set.add("1");
// when:
final Iterator<String> it = set.iterator();
it.next();
it.remove();
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new RemoveCommandMatcher<>(set.getCrdtId(), "1", "1")
));
}
@SuppressWarnings("unchecked")
@Test
public void shouldSendNotificationForAdds() {
// given:
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final ORSet<String> set = new ORSet<>("ID_1");
set.subscribe(subscriber);
// when:
set.add("1");
set.add("2");
set.add("1");
// then:
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new AddCommandMatcher<>(set.getCrdtId(), "1"),
new AddCommandMatcher<>(set.getCrdtId(), "2"),
new AddCommandMatcher<>(set.getCrdtId(), "1")
));
}
@Test
public void manyToManyRetryAfter() {
TestSubscriber<Integer> test = newThreeErrorFlowable()
.compose(GrpcRetry.ManyToMany.retryAfter(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> flowable) {
return flowable;
}
}, 10, TimeUnit.MILLISECONDS))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void test_just_Flowable() {
Flowable<String> observable = Flowable.just("mango", "papaya", "guava");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
observable.subscribe(testSubscriber);
List<String> items = testSubscriber.values();
testSubscriber.assertComplete();
testSubscriber.assertSubscribed();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(3);
testSubscriber.assertValues("mango", "papaya", "guava");
}
@Test
public void getCursorExecuteAsBlocking() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor cursor = storIOContentResolver
.get()
.cursor()
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.executeAsBlocking();
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItemToInsert.equalsWithoutId(TestItem.fromCursor(cursor))).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void getCommitsInWeek() {
TestSubscriber<Commit> testSubscriber = new TestSubscriber<>();
gitHubService.getCommitsInWeek("test-user", "test-repo")
.subscribe(testSubscriber);
testSubscriber.assertNoErrors();
assertThat(testSubscriber.values())
.as("getCommitsInWeek returns commits updated by `user` within one week")
.extracting(Commit::getSha)
.containsExactly("sha1");
}
@Test
public void updateNullFieldToNotNull() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
Uri insertedUri =
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value", null).toContentValues()); // firstly, optional value is null
TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value", "optionalValue"); // change to not null
PutResult updateResult = storIOContentResolver
.put()
.object(testItem)
.prepare()
.executeAsBlocking();
assertThat(updateResult.wasUpdated()).isTrue();
Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteObjectAsSingle() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResult deleteResult = storIOContentResolver
.delete()
.object(testItem)
.prepare()
.asRxSingle()
.blockingGet();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void itShouldSendCommandsOnUpdates() {
// given
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GCounter counter = new GCounter(NODE_ID_1, CRDT_ID);
counter.subscribe(subscriber);
// when
counter.increment();
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new UpdateCommandMatcher(CRDT_ID, HashMap.of(NODE_ID_1, 1L))
));
// when
counter.increment(42L);
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new UpdateCommandMatcher(CRDT_ID, HashMap.of(NODE_ID_1, 1L)),
new UpdateCommandMatcher(CRDT_ID, HashMap.of(NODE_ID_1, 43L))
));
}
@SuppressWarnings("unchecked")
@Test
public void itShouldSendCommandsOnUpdates() {
// given
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final MVRegister<String> register = new MVRegister<>(NODE_ID_1, CRDT_ID);
register.subscribe(subscriber);
// when
register.set("Hello World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World")
));
// when
register.set("Hello World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World")
));
// when
register.set("Goodbye World");
// then
subscriber.assertNotComplete();
subscriber.assertNoErrors();
assertThat(subscriber.values(), contains(
new SetCommandMatcher<>(CRDT_ID, "Hello World"),
new SetCommandMatcher<>(CRDT_ID, "Goodbye World")
));
}
@Test
public void deleteObjectExecuteAsBlocking() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResult deleteResult = storIOContentResolver
.delete()
.object(testItem)
.prepare()
.executeAsBlocking();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void queryOneExistedObjectTableUpdate() {
User expectedUser = User.newInstance(null, "[email protected]");
putUsersBlocking(3);
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs(expectedUser.email())
.build())
.prepare()
.asRxFlowable(LATEST)
.take(2);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.<User>empty());
putUserBlocking(expectedUser);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValues(Optional.<User>empty(), Optional.of(expectedUser));
}
@Test
public void queryOneNonexistedObjectTableUpdate() {
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs("some arg")
.build())
.prepare()
.asRxFlowable(LATEST)
.take(2);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.<User>empty());
putUserBlocking();
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValues(Optional.<User>empty(), Optional.<User>empty());
}
@Test
public void deleteObjectAsRxFlowable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResult deleteResult = storIOContentResolver
.delete()
.object(testItem)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.blockingFirst();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteByQueryExecuteAsBlocking() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
DeleteResult deleteResult = storIOContentResolver
.delete()
.byQuery(DeleteQuery.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.executeAsBlocking();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionLowerThan16() {
for (int sdkVersion = MIN_SDK_VERSION; sdkVersion < 16; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final Map<Uri, ContentObserver> contentObservers = new HashMap<Uri, ContentObserver>(3);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
contentObservers.put((Uri) invocation.getArguments()[0], (ContentObserver) invocation.getArguments()[2]);
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Uri uri1 = mock(Uri.class);
Uri uri2 = mock(Uri.class);
Set<Uri> uris = new HashSet<Uri>(2);
uris.add(uri1);
uris.add(uri2);
RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
)
.subscribe(testSubscriber);
testSubscriber.assertNotTerminated();
testSubscriber.assertNoValues();
// Emulate change of Uris, Flowable should react and emit Changes objects
contentObservers.get(uri1).onChange(false);
contentObservers.get(uri2).onChange(false);
testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));
testSubscriber.dispose();
testSubscriber.assertNoErrors();
}
}
@Test
@Repeat(times = 20)
public void concurrentPutWithoutGlobalTransaction() throws InterruptedException {
final int numberOfConcurrentPuts = ConcurrencyTesting.optimalTestThreadsCount();
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChangesInTable(TweetTableMeta.TABLE, LATEST)
.subscribe(testSubscriber);
final CountDownLatch concurrentPutLatch = new CountDownLatch(1);
final CountDownLatch allPutsDoneLatch = new CountDownLatch(numberOfConcurrentPuts);
for (int i = 0; i < numberOfConcurrentPuts; i++) {
final int iCopy = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
concurrentPutLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
storIOSQLite
.put()
.object(Tweet.newInstance(null, 1L, "Some text: " + iCopy))
.prepare()
.executeAsBlocking();
allPutsDoneLatch.countDown();
}
}).start();
}
// Start concurrent Put operations.
concurrentPutLatch.countDown();
assertThat(allPutsDoneLatch.await(25, SECONDS)).isTrue();
testSubscriber.assertNoErrors();
// Put operation creates short-term transaction which might result in merge of some notifications.
// So we have two extreme cases:
// - no merged notifications → isEqualTo(numberOfParallelPuts)
// - all notifications merged → isEqualTo(1)
// Obviously truth is somewhere between those (depends on CPU of machine that runs test).
assertThat(testSubscriber.valueCount())
.isLessThanOrEqualTo(numberOfConcurrentPuts)
.isGreaterThanOrEqualTo(1);
}
@Test
public void getCursorAsFlowableOnlyInitialValue() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
final TestSubscriber<Cursor> cursorTestSubscriber = new TestSubscriber<Cursor>();
storIOContentResolver
.get()
.cursor()
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(1)
.subscribe(cursorTestSubscriber);
cursorTestSubscriber.awaitTerminalEvent(60, SECONDS);
cursorTestSubscriber.assertNoErrors();
List<Cursor> listOfCursors = cursorTestSubscriber.values();
assertThat(listOfCursors).hasSize(1);
Assertions.assertThat(listOfCursors.get(0)).hasCount(1);
listOfCursors.get(0).moveToFirst();
assertThat(testItemToInsert.equalsWithoutId(TestItem.fromCursor(listOfCursors.get(0))))
.isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI));
}