下面列出了io.reactivex.subscribers.TestSubscriber#assertValues ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Predicate<Throwable> predicate = new Predicate<Throwable>() {
@Override
public boolean test(Throwable t) {
return t instanceof IllegalArgumentException;
}
};
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryIf(predicate).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertNotComplete();
}
public <Result, WrappedResult, Data> void verifyChangesReceived(
@NonNull PreparedOperation<Result, WrappedResult, Data> operation,
@NonNull Changes changes,
@NonNull WrappedResult value
) {
TestSubscriber<WrappedResult> testSubscriber = new TestSubscriber<WrappedResult>();
operation
.asRxFlowable(MISSING)
.subscribe(testSubscriber);
testSubscriber.assertValues(value);
storIOSQLite.lowLevel().notifyAboutChanges(changes);
testSubscriber.assertValues(value, value);
}
@Test
public void observeChangesInTables_shouldReceiveIfTableWasChanged() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Set<String> tables = new HashSet<String>(2);
tables.add("table1");
tables.add("table2");
storIOSQLite
.observeChangesInTables(tables, LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("table2");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValues(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void manyToManyRetryAfter() {
TestSubscriber<Integer> test = newThreeErrorFlowable()
.compose(GrpcRetry.ManyToMany.retryAfter(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> flowable) {
return flowable;
}
}, 10, TimeUnit.MILLISECONDS))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@Test
public void testUpdatesAllChecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_ALL_CH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.observeAll(ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValues(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
updatesSubscriber.assertNoErrors();
}
@Test
public void testTestSubscriber() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
//In order to emit "1", "2", "3"
Flowable.just("1", "2", "3").subscribe(testSubscriber);
//Assert whether values are equal
testSubscriber.assertValues("1", "2", "3");
//Assert value does not exist
testSubscriber.assertNever("4");
//Is the number of asserted values equal?
testSubscriber.assertValueCount(3);
//Assertion terminated
testSubscriber.assertTerminated();
}
@Test
public void deleteByQueryExecuteAsBlocking() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
DeleteResult deleteResult = storIOContentResolver
.delete()
.byQuery(DeleteQuery.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.executeAsBlocking();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteByQueryAsRxFlowable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
DeleteResult deleteResult = storIOContentResolver
.delete()
.byQuery(DeleteQuery.builder()
.uri(TestItem.CONTENT_URI)
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.blockingFirst();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void queryOneExistedObjectTableUpdate() {
User expectedUser = User.newInstance(null, "[email protected]");
putUsersBlocking(3);
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs(expectedUser.email())
.build())
.prepare()
.asRxFlowable(LATEST)
.take(2);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.<User>empty());
putUserBlocking(expectedUser);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValues(Optional.<User>empty(), Optional.of(expectedUser));
}
@Test
public void test5() throws Exception {
TestScheduler sch = new TestScheduler();
FlowableProcessor<Long> subject = PublishProcessor.create();
Flowable<Long> initialObservable = subject.share()
.map(value -> {
System.out.println("Received value " + value);
//new Exception().printStackTrace(System.out);
return value;
});
Flowable<Long> timeoutObservable = initialObservable.map(value -> {
System.out.println("Timeout received value " + value);
return value;
});
TestSubscriber<Long> subscriber = new TestSubscriber<>();
initialObservable
.doOnCancel(() -> {
System.out.println("Unsubscribed");
new Exception().printStackTrace(System.out);
})
.timeout(1, TimeUnit.SECONDS, sch, timeoutObservable).subscribe(subscriber);
subject.onNext(5L);
sch.advanceTimeBy(2, TimeUnit.SECONDS);
subject.onNext(10L);
subject.onComplete();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValues(5L, 10L);
}
@Test
public void updateContentValuesAsRxFlowable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());
TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");
PutResult updateResult = storIOContentResolver
.put()
.contentValues(testItem.toContentValues())
.withPutResolver(testItemContentValuesPutResolver)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.blockingFirst();
assertThat(updateResult.wasUpdated()).isTrue();
Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void updateObjectAsCompletable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());
TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");
storIOContentResolver
.put()
.object(testItem)
.prepare()
.asRxCompletable()
.blockingAwait(15, SECONDS);
Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteObjectAsSingle() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResult deleteResult = storIOContentResolver
.delete()
.object(testItem)
.prepare()
.asRxSingle()
.blockingGet();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteObjectsAsSingle() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResults<TestItem> deleteResults = storIOContentResolver
.delete()
.objects(singletonList(testItem))
.prepare()
.asRxSingle()
.blockingGet();
assertThat(deleteResults.wasDeleted(testItem)).isTrue();
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void updateContentValuesAsSingle() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());
TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");
PutResult updateResult = storIOContentResolver
.put()
.contentValues(testItem.toContentValues())
.withPutResolver(testItemContentValuesPutResolver)
.prepare()
.asRxSingle()
.blockingGet();
assertThat(updateResult.wasUpdated()).isTrue();
Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(cursor).hasCount(1);
cursor.moveToFirst();
assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void deleteObjectAsRxFlowable() {
TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(2)
.subscribe(changesTestSubscriber);
TestItem testItemToInsert = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());
Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(firstDbState).hasCount(1);
//noinspection ConstantConditions
assertThat(firstDbState.moveToFirst()).isTrue();
TestItem testItem = TestItem.fromCursor(firstDbState);
DeleteResult deleteResult = storIOContentResolver
.delete()
.object(testItem)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.blockingFirst();
assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);
Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
Assertions.assertThat(secondDbState).hasCount(0);
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void shouldReceiveOneNotificationInTransactionWithMultipleThreads() throws InterruptedException {
final String table = "test_table";
final int numberOfThreads = 100;
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
storIOSQLite
.lowLevel()
.beginTransaction();
final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// All threads should start "simultaneously".
startAllThreadsLock.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
storIOSQLite
.lowLevel()
.notifyAboutChanges(Changes.newInstance(table));
allThreadsFinishedLock.countDown();
}
}).start();
}
// Ready!
// Steady!
startAllThreadsLock.countDown(); // Go!
assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();
// While we in transaction, no changes should be sent.
testSubscriber.assertValueCount(0);
storIOSQLite
.lowLevel()
.endTransaction();
testSubscriber.assertNoErrors();
testSubscriber.assertValues(Changes.newInstance(table));
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendCommandsToConnectedStore() {
// given:
final CrdtStore store1 = new CrdtStore(NODE_ID_1);
store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
store1.subscribe(store1Subscriber);
final CrdtStore store2 = new CrdtStore(NODE_ID_2);
store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
store2.subscribe(store2Subscriber);
store1.connect(store2);
final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
crdt1.subscribe(crdt1Subscriber);
final SimpleCrdt crdt2 = (SimpleCrdt) store2.findCrdt(CRDT_ID).get();
final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
crdt2.subscribe(crdt2Subscriber);
// when:
final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command1_1);
final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command2_1);
final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command3_1, command3_2, command3_3);
final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command4_1, command4_2, command4_3);
// then:
assertThat(store1Subscriber.valueCount(), is(1));
store1Subscriber.assertNotComplete();
store1Subscriber.assertNoErrors();
crdt1Subscriber.assertValues(
command1_1,
command2_1,
command3_1,
command3_2,
command3_3,
command4_1,
command4_2,
command4_3
);
crdt1Subscriber.assertNotComplete();
crdt1Subscriber.assertNoErrors();
assertThat(store2Subscriber.valueCount(), is(1));
store2Subscriber.assertNotComplete();
store2Subscriber.assertNoErrors();
crdt2Subscriber.assertValues(
command1_1,
command2_1,
command3_1,
command3_2,
command3_3,
command4_1,
command4_2,
command4_3
);
crdt2Subscriber.assertNotComplete();
crdt2Subscriber.assertNoErrors();
}
@Test
@SuppressWarnings("unchecked")
public void shouldSendAllCommandsAfterConnect() {
// given:
final CrdtStore store1 = new CrdtStore(NODE_ID_1);
store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
store1.subscribe(store1Subscriber);
final CrdtStore store2 = new CrdtStore(NODE_ID_2);
store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
store2.subscribe(store2Subscriber);
final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
crdt1.subscribe(crdt1Subscriber);
final SimpleCrdt crdt2 = store2.createCrdt(SimpleCrdt.class, CRDT_ID);
final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
crdt2.subscribe(crdt2Subscriber);
final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command1_1);
final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command2_1);
final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
crdt1.sendCommands(command3_1, command3_2, command3_3);
final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
crdt2.sendCommands(command4_1, command4_2, command4_3);
// when:
store1.connect(store2);
// then:
assertThat(store1Subscriber.valueCount(), is(1));
store1Subscriber.assertNotComplete();
store1Subscriber.assertNoErrors();
crdt1Subscriber.assertValues(
command1_1,
command3_1,
command3_2,
command3_3,
command2_1,
command4_1,
command4_2,
command4_3
);
crdt1Subscriber.assertNotComplete();
crdt1Subscriber.assertNoErrors();
assertThat(store2Subscriber.valueCount(), is(1));
store2Subscriber.assertNotComplete();
store2Subscriber.assertNoErrors();
crdt2Subscriber.assertValues(
command2_1,
command4_1,
command4_2,
command4_3,
command1_1,
command3_1,
command3_2,
command3_3
);
crdt2Subscriber.assertNotComplete();
crdt2Subscriber.assertNoErrors();
}