io.reactivex.subscribers.TestSubscriber#assertValues ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#assertValues ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: rxjava2-extras   文件: RetryWhenTest.java
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsTrue() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Predicate<Throwable> predicate = new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable t) {
            return t instanceof IllegalArgumentException;
        }
    };
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryIf(predicate).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertNotComplete();
}
 
源代码2 项目: storio   文件: BaseOperationObserveChangesTest.java
public <Result, WrappedResult, Data> void verifyChangesReceived(
        @NonNull PreparedOperation<Result, WrappedResult, Data> operation,
        @NonNull Changes changes,
        @NonNull WrappedResult value
) {
    TestSubscriber<WrappedResult> testSubscriber = new TestSubscriber<WrappedResult>();

    operation
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.assertValues(value);

    storIOSQLite.lowLevel().notifyAboutChanges(changes);

    testSubscriber.assertValues(value, value);
}
 
源代码3 项目: storio   文件: DefaultStorIOSQLiteTest.java
@Test
public void observeChangesInTables_shouldReceiveIfTableWasChanged() {
    TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    Set<String> tables = new HashSet<String>(2);
    tables.add("table1");
    tables.add("table2");

    storIOSQLite
            .observeChangesInTables(tables, LATEST)
            .subscribe(testSubscriber);

    testSubscriber.assertNoValues();

    Changes changes = Changes.newInstance("table2");

    storIOSQLite
            .lowLevel()
            .notifyAboutChanges(changes);

    testSubscriber.assertValues(changes);
    testSubscriber.assertNoErrors();
    testSubscriber.dispose();
}
 
源代码4 项目: reactive-grpc   文件: GrpcRetryTest.java
@Test
public void manyToManyRetryAfter() {
    TestSubscriber<Integer> test = newThreeErrorFlowable()
            .compose(GrpcRetry.ManyToMany.retryAfter(new Function<Flowable<Integer>, Flowable<Integer>>() {
                @Override
                public Flowable<Integer> apply(Flowable<Integer> flowable) {
                    return flowable;
                }
            }, 10, TimeUnit.MILLISECONDS))
            .test();

    test.awaitTerminalEvent(1, TimeUnit.SECONDS);
    test.assertValues(0);
    test.assertNoErrors();
    test.assertComplete();
}
 
源代码5 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFailsBecauseIsNotInstanceOf() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).retryWhenInstanceOf(SQLException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
源代码6 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testUpdatesAllChecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_ALL_CH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.observeAll(ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValues(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    updatesSubscriber.assertNoErrors();
}
 
@Test
public void testTestSubscriber() {

    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    //In order to emit "1", "2", "3"
    Flowable.just("1", "2", "3").subscribe(testSubscriber);
    //Assert whether values are equal
    testSubscriber.assertValues("1", "2", "3");
    //Assert value does not exist
    testSubscriber.assertNever("4");
    //Is the number of asserted values equal?
    testSubscriber.assertValueCount(3);
    //Assertion terminated
    testSubscriber.assertTerminated();
}
 
源代码8 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteByQueryExecuteAsBlocking() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    DeleteResult deleteResult = storIOContentResolver
            .delete()
            .byQuery(DeleteQuery.builder()
                    .uri(TestItem.CONTENT_URI)
                    .build())
            .prepare()
            .executeAsBlocking();

    assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码9 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteByQueryAsRxFlowable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    DeleteResult deleteResult = storIOContentResolver
            .delete()
            .byQuery(DeleteQuery.builder()
                    .uri(TestItem.CONTENT_URI)
                    .build())
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .blockingFirst();

    assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码10 项目: storio   文件: RxQueryTest.java
@Test
public void queryOneExistedObjectTableUpdate() {
    User expectedUser = User.newInstance(null, "[email protected]");
    putUsersBlocking(3);

    final Flowable<Optional<User>> userFlowable = storIOSQLite
            .get()
            .object(User.class)
            .withQuery(Query.builder()
                    .table(UserTableMeta.TABLE)
                    .where(UserTableMeta.COLUMN_EMAIL + "=?")
                    .whereArgs(expectedUser.email())
                    .build())
            .prepare()
            .asRxFlowable(LATEST)
            .take(2);

    TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
    userFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();
    testSubscriber.assertValue(Optional.<User>empty());

    putUserBlocking(expectedUser);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();
    testSubscriber.assertValues(Optional.<User>empty(), Optional.of(expectedUser));
}
 
源代码11 项目: akarnokd-misc   文件: TimeoutCancelv2Test.java
@Test
public void test5() throws Exception {
    TestScheduler sch = new TestScheduler();

    FlowableProcessor<Long> subject = PublishProcessor.create();
    Flowable<Long> initialObservable = subject.share()
    .map(value -> {
        System.out.println("Received value " + value);
        //new Exception().printStackTrace(System.out);
        return value;
    });

    Flowable<Long> timeoutObservable = initialObservable.map(value -> {
       System.out.println("Timeout received value " + value);
       return value;
    });

    TestSubscriber<Long> subscriber = new TestSubscriber<>();
    initialObservable
    .doOnCancel(() -> { 
        System.out.println("Unsubscribed"); 
        new Exception().printStackTrace(System.out);
    })
    .timeout(1, TimeUnit.SECONDS, sch, timeoutObservable).subscribe(subscriber);
    subject.onNext(5L);
    sch.advanceTimeBy(2, TimeUnit.SECONDS);
    subject.onNext(10L);
    subject.onComplete();

    subscriber.awaitTerminalEvent();
    subscriber.assertNoErrors();
    subscriber.assertValues(5L, 10L);
}
 
源代码12 项目: storio   文件: PutOperationTest.java
@Test
public void updateContentValuesAsRxFlowable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());

    TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");

    PutResult updateResult = storIOContentResolver
            .put()
            .contentValues(testItem.toContentValues())
            .withPutResolver(testItemContentValuesPutResolver)
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .blockingFirst();

    assertThat(updateResult.wasUpdated()).isTrue();

    Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);

    Assertions.assertThat(cursor).hasCount(1);

    cursor.moveToFirst();

    assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码13 项目: storio   文件: PutOperationTest.java
@Test
public void updateObjectAsCompletable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());

    TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");

    storIOContentResolver
            .put()
            .object(testItem)
            .prepare()
            .asRxCompletable()
            .blockingAwait(15, SECONDS);

    Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);

    Assertions.assertThat(cursor).hasCount(1);

    cursor.moveToFirst();

    assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码14 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectAsSingle() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    DeleteResult deleteResult = storIOContentResolver
            .delete()
            .object(testItem)
            .prepare()
            .asRxSingle()
            .blockingGet();

    assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码15 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectsAsSingle() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    DeleteResults<TestItem> deleteResults = storIOContentResolver
            .delete()
            .objects(singletonList(testItem))
            .prepare()
            .asRxSingle()
            .blockingGet();

    assertThat(deleteResults.wasDeleted(testItem)).isTrue();

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码16 项目: storio   文件: PutOperationTest.java
@Test
public void updateContentValuesAsSingle() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());

    TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");

    PutResult updateResult = storIOContentResolver
            .put()
            .contentValues(testItem.toContentValues())
            .withPutResolver(testItemContentValuesPutResolver)
            .prepare()
            .asRxSingle()
            .blockingGet();

    assertThat(updateResult.wasUpdated()).isTrue();

    Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);

    Assertions.assertThat(cursor).hasCount(1);

    cursor.moveToFirst();

    assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码17 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectAsRxFlowable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    DeleteResult deleteResult = storIOContentResolver
            .delete()
            .object(testItem)
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .blockingFirst();

    assertThat(deleteResult.numberOfRowsDeleted()).isEqualTo(1);

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码18 项目: storio   文件: NotifyAboutChangesTest.java
@Test
public void shouldReceiveOneNotificationInTransactionWithMultipleThreads() throws InterruptedException {
    final String table = "test_table";
    final int numberOfThreads = 100;

    final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

    storIOSQLite
            .observeChanges(LATEST)
            .subscribe(testSubscriber);

    storIOSQLite
            .lowLevel()
            .beginTransaction();

    final CountDownLatch startAllThreadsLock = new CountDownLatch(1);
    final CountDownLatch allThreadsFinishedLock = new CountDownLatch(numberOfThreads);

    for (int i = 0; i < numberOfThreads; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // All threads should start "simultaneously".
                    startAllThreadsLock.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                storIOSQLite
                        .lowLevel()
                        .notifyAboutChanges(Changes.newInstance(table));

                allThreadsFinishedLock.countDown();
            }
        }).start();
    }

    // Ready!
    // Steady!
    startAllThreadsLock.countDown(); // Go!

    assertThat(allThreadsFinishedLock.await(20, SECONDS)).isTrue();

    // While we in transaction, no changes should be sent.
    testSubscriber.assertValueCount(0);

    storIOSQLite
            .lowLevel()
            .endTransaction();

    testSubscriber.assertNoErrors();
    testSubscriber.assertValues(Changes.newInstance(table));
}
 
源代码19 项目: wurmloch-crdt   文件: CrdtStoreTest.java
@Test
@SuppressWarnings("unchecked")
public void shouldSendCommandsToConnectedStore() {
    // given:
    final CrdtStore store1 = new CrdtStore(NODE_ID_1);
    store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
    final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
    store1.subscribe(store1Subscriber);

    final CrdtStore store2 = new CrdtStore(NODE_ID_2);
    store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
    final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
    store2.subscribe(store2Subscriber);

    store1.connect(store2);

    final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
    final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
    crdt1.subscribe(crdt1Subscriber);

    final SimpleCrdt crdt2 = (SimpleCrdt) store2.findCrdt(CRDT_ID).get();
    final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
    crdt2.subscribe(crdt2Subscriber);

    // when:
    final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    crdt1.sendCommands(command1_1);

    final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    crdt2.sendCommands(command2_1);

    final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    crdt1.sendCommands(command3_1, command3_2, command3_3);

    final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    crdt2.sendCommands(command4_1, command4_2, command4_3);

    // then:
    assertThat(store1Subscriber.valueCount(), is(1));
    store1Subscriber.assertNotComplete();
    store1Subscriber.assertNoErrors();

    crdt1Subscriber.assertValues(
            command1_1,
            command2_1,
            command3_1,
            command3_2,
            command3_3,
            command4_1,
            command4_2,
            command4_3
    );
    crdt1Subscriber.assertNotComplete();
    crdt1Subscriber.assertNoErrors();

    assertThat(store2Subscriber.valueCount(), is(1));
    store2Subscriber.assertNotComplete();
    store2Subscriber.assertNoErrors();

    crdt2Subscriber.assertValues(
            command1_1,
            command2_1,
            command3_1,
            command3_2,
            command3_3,
            command4_1,
            command4_2,
            command4_3
    );
    crdt2Subscriber.assertNotComplete();
    crdt2Subscriber.assertNoErrors();

}
 
源代码20 项目: wurmloch-crdt   文件: CrdtStoreTest.java
@Test
@SuppressWarnings("unchecked")
public void shouldSendAllCommandsAfterConnect() {
    // given:
    final CrdtStore store1 = new CrdtStore(NODE_ID_1);
    store1.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
    final TestSubscriber<CrdtDefinition> store1Subscriber = TestSubscriber.create();
    store1.subscribe(store1Subscriber);

    final CrdtStore store2 = new CrdtStore(NODE_ID_2);
    store2.registerFactory(SimpleCrdt.class, SimpleCrdt::new);
    final TestSubscriber<CrdtDefinition> store2Subscriber = TestSubscriber.create();
    store2.subscribe(store2Subscriber);


    final SimpleCrdt crdt1 = store1.createCrdt(SimpleCrdt.class, CRDT_ID);
    final TestSubscriber<CrdtCommand> crdt1Subscriber = TestSubscriber.create();
    crdt1.subscribe(crdt1Subscriber);
    final SimpleCrdt crdt2 = store2.createCrdt(SimpleCrdt.class, CRDT_ID);
    final TestSubscriber<CrdtCommand> crdt2Subscriber = TestSubscriber.create();
    crdt2.subscribe(crdt2Subscriber);

    final SimpleCommand command1_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    crdt1.sendCommands(command1_1);

    final SimpleCommand command2_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    crdt2.sendCommands(command2_1);

    final SimpleCommand command3_1 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    final SimpleCommand command3_2 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    final SimpleCommand command3_3 = new SimpleCommand(NODE_ID_1, CRDT_ID);
    crdt1.sendCommands(command3_1, command3_2, command3_3);

    final SimpleCommand command4_1 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    final SimpleCommand command4_2 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    final SimpleCommand command4_3 = new SimpleCommand(NODE_ID_2, CRDT_ID);
    crdt2.sendCommands(command4_1, command4_2, command4_3);

    // when:
    store1.connect(store2);

    // then:
    assertThat(store1Subscriber.valueCount(), is(1));
    store1Subscriber.assertNotComplete();
    store1Subscriber.assertNoErrors();

    crdt1Subscriber.assertValues(
            command1_1,
            command3_1,
            command3_2,
            command3_3,
            command2_1,
            command4_1,
            command4_2,
            command4_3
    );
    crdt1Subscriber.assertNotComplete();
    crdt1Subscriber.assertNoErrors();

    assertThat(store2Subscriber.valueCount(), is(1));
    store2Subscriber.assertNotComplete();
    store2Subscriber.assertNoErrors();

    crdt2Subscriber.assertValues(
            command2_1,
            command4_1,
            command4_2,
            command4_3,
            command1_1,
            command3_1,
            command3_2,
            command3_3
    );
    crdt2Subscriber.assertNotComplete();
    crdt2Subscriber.assertNoErrors();
}