io.reactivex.subscribers.TestSubscriber#awaitTerminalEvent ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#awaitTerminalEvent ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: reactive-grpc   文件: BackpressureChunkingTest.java
/**
 * https://github.com/salesforce/reactive-grpc/issues/120
 */
@Test
public void chunkOperatorWorksWithConcatMap() {
    int chunkSize = DEFAULT_CHUNK_SIZE;

    AbstractStreamObserverAndPublisher<Long> source =
            new AbstractStreamObserverAndPublisher<Long>(new ConcurrentLinkedQueue<Long>(), null){};
    AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, 24);
    source.onSubscribe(observer);
    TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
                                                  .concatMap(new Function<Long, Publisher<Long>>() {
                                                      @Override
                                                      public Publisher<Long> apply(Long item) throws Exception {
                                                          return Flowable.just(item).delay(3, TimeUnit.MILLISECONDS);
                                                      }
                                                  })
                                                  .test();

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoErrors();

    assertThat(observer.requestsQueue).containsExactly(chunkSize);
}
 
源代码2 项目: reactive-grpc   文件: GrpcRetryTest.java
@Test
public void oneToManyRetryWhen() {
    TestSubscriber<Integer> test = newThreeErrorSingle()
            .as(GrpcRetry.OneToMany.retryWhen(new Function<Single<Integer>, Flowable<Integer>>() {
                @Override
                public Flowable<Integer> apply(Single<Integer> single) {
                    return single.toFlowable();
                }
            }, RetryWhen.maxRetries(3).build()))
            .test();

    test.awaitTerminalEvent(1, TimeUnit.SECONDS);
    test.assertValues(0);
    test.assertNoErrors();
    test.assertComplete();
}
 
源代码3 项目: storio   文件: PutOperationTest.java
@Test
public void updateContentValuesExecuteAsBlocking() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    Uri insertedUri = contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());

    TestItem testItem = TestItem.create(ContentUris.parseId(insertedUri), "value2");

    PutResult updateResult = storIOContentResolver
            .put()
            .contentValues(testItem.toContentValues())
            .withPutResolver(testItemContentValuesPutResolver)
            .prepare()
            .executeAsBlocking();

    assertThat(updateResult.wasUpdated()).isTrue();

    Cursor cursor = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);

    Assertions.assertThat(cursor).hasCount(1);

    cursor.moveToFirst();

    assertThat(testItem).isEqualTo(TestItem.fromCursor(cursor));

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码4 项目: storio   文件: GetOperationTest.java
@Test
public void getNonExistedObjectExecuteAsBlocking() {
    final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(1)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    TestItem testItem = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("some value")
                    .build())
            .prepare()
            .executeAsBlocking();

    assertThat(testItem).isNull();

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码5 项目: klingar   文件: QueueManagerTest.java
@Test public void currentQueue() {
  TestSubscriber<Pair<List<Track>, Integer>> test = queueManager.queue().take(1).test();
  test.awaitTerminalEvent();

  List<Track> actualQueue = test.values().get(0).first;
  int actualPosition = test.values().get(0).second;

  assertThat(actualQueue, IsIterableContainingInOrder.contains(
      queue.get(0), queue.get(1), queue.get(2), queue.get(3), queue.get(4)));
  assertThat(actualPosition, is(0));
}
 
源代码6 项目: storio   文件: GetOperationTest.java
@Test
public void getExistedObjectExecuteAsFlowable() {
    final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(1)
            .subscribe(changesTestSubscriber);

    TestItem expectedItem = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());

    Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("value")
                    .build())
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .take(1);

    TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
    testItemFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();

    List<Optional<TestItem>> emmitedItems = testSubscriber.values();
    assertThat(emmitedItems.size()).isEqualTo(1);
    assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码7 项目: storio   文件: PreparedDeleteObjectTest.java
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingDbAsFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
    final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);

    when(storIOSQLite.lowLevel()).thenReturn(lowLevel);

    when(storIOSQLite.delete()).thenReturn(new PreparedDelete.Builder(storIOSQLite));

    final TestSubscriber<DeleteResult> testSubscriber = new TestSubscriber<DeleteResult>();

    storIOSQLite
            .delete()
            .object(TestItem.newInstance())
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoValues();
    assertThat(testSubscriber.errors().get(0)).
            hasCauseInstanceOf(IllegalStateException.class);

    verify(storIOSQLite).delete();
    verify(storIOSQLite).lowLevel();
    verify(storIOSQLite).defaultRxScheduler();
    verify(storIOSQLite).interceptors();
    verify(lowLevel).typeMapping(TestItem.class);
    verify(lowLevel, never()).delete(any(DeleteQuery.class));
    verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
 
源代码8 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectsExecuteAsBlocking() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    DeleteResults<TestItem> deleteResults = storIOContentResolver
            .delete()
            .objects(singletonList(testItem))
            .prepare()
            .executeAsBlocking();

    assertThat(deleteResults.wasDeleted(testItem)).isTrue();

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
@Test
public void serverCanCancelClientStreamImplicitlyBidi() {
    TestService svc = new TestService();
    serverRule.getServiceRegistry().addService(svc);

    RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());

    svc.setExplicitCancel(false);

    AtomicBoolean requestWasCanceled = new AtomicBoolean(false);
    AtomicBoolean requestDidProduce = new AtomicBoolean(false);

    Flowable<NumberProto.Number> request = Flowable
            .fromIterable(IntStream.range(0, NUMBER_OF_STREAM_ELEMENTS)::iterator)
            .delay(10, TimeUnit.MILLISECONDS)
            .map(CancellationPropagationIntegrationTest::protoNum)
            .doOnNext(x -> {
                requestDidProduce.set(true);
                System.out.println("Produced: " + x.getNumber(0));
            })
            .doOnCancel(() -> {
                requestWasCanceled.set(true);
                System.out.println("Client canceled");
            });

    TestSubscriber<NumberProto.Number> observer = request
            .compose(stub::twoWayPressure)
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .test();

    observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
    observer.assertTerminated();
    assertThat(requestWasCanceled.get()).isTrue();
    assertThat(requestDidProduce.get()).isTrue();

    errorRule.verifyNoError();
}
 
@Test
public void statusRuntimeExceptionTriggersHandlerFuseable() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    });

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .observeOn(Schedulers.trampoline())
                                  .test();

    StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isTrue();
}
 
源代码11 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectsAsRxFlowable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    DeleteResults<TestItem> deleteResults = storIOContentResolver
            .delete()
            .objects(singletonList(testItem))
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .blockingFirst();

    assertThat(deleteResults.wasDeleted(testItem)).isTrue();

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAffectingContentProviderAsFlowable() {
    final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
    final StorIOContentResolver.LowLevel lowLevel = mock(StorIOContentResolver.LowLevel.class);

    when(storIOContentResolver.lowLevel()).thenReturn(lowLevel);

    when(storIOContentResolver.delete()).thenReturn(new PreparedDelete.Builder(storIOContentResolver));

    final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());

    final TestSubscriber<DeleteResults<TestItem>> testSubscriber = new TestSubscriber<DeleteResults<TestItem>>();

    storIOContentResolver
            .delete()
            .objects(items)
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoValues();
    assertThat(testSubscriber.errors().get(0))
            .isInstanceOf(StorIOException.class)
            .hasCauseInstanceOf(IllegalStateException.class);

    verify(storIOContentResolver).delete();
    verify(storIOContentResolver).lowLevel();
    verify(storIOContentResolver).interceptors();
    verify(storIOContentResolver).defaultRxScheduler();
    verify(lowLevel).typeMapping(TestItem.class);
    verify(lowLevel, never()).delete(any(DeleteQuery.class));
    verifyNoMoreInteractions(storIOContentResolver, lowLevel);
}
 
源代码13 项目: storio   文件: GetOperationTest.java
@Test
public void getExistedObjectExecuteAsSingle() {
    final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(1)
            .subscribe(changesTestSubscriber);

    TestItem expectedItem = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());

    Single<Optional<TestItem>> testItemSingle = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("value")
                    .build())
            .prepare()
            .asRxSingle();

    TestObserver<Optional<TestItem>> testObserver = new TestObserver<Optional<TestItem>>();
    testItemSingle.subscribe(testObserver);

    testObserver.awaitTerminalEvent(5, SECONDS);
    testObserver.assertNoErrors();

    List<Optional<TestItem>> emmitedItems = testObserver.values();
    assertThat(emmitedItems.size()).isEqualTo(1);
    assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
 
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithTransactionWithoutAffectingDbAsFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
    final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);

    when(storIOSQLite.lowLevel()).thenReturn(lowLevel);

    when(storIOSQLite.put()).thenReturn(new PreparedPut.Builder(storIOSQLite));

    final List<TestItem> items = asList(TestItem.newInstance(), TestItem.newInstance());

    final TestSubscriber<PutResults<TestItem>> testSubscriber = new TestSubscriber<PutResults<TestItem>>();

    storIOSQLite
            .put()
            .objects(items)
            .useTransaction(true)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoValues();
    assertThat(testSubscriber.errors().get(0))
            .isInstanceOf(StorIOException.class)
            .hasCauseInstanceOf(IllegalStateException.class);

    verify(storIOSQLite).put();
    verify(storIOSQLite).lowLevel();
    verify(storIOSQLite).defaultRxScheduler();
    verify(storIOSQLite).interceptors();
    verify(lowLevel).typeMapping(TestItem.class);
    verify(lowLevel, never()).insert(any(InsertQuery.class), any(ContentValues.class));
    verify(lowLevel, never()).update(any(UpdateQuery.class), any(ContentValues.class));
    verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
 
源代码15 项目: storio   文件: GetOperationTest.java
@Test
public void getOneExistedObjectTableUpdate() {
    TestItem expectedItem = TestItem.create(null, "value");

    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value3").toContentValues());

    Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("value")
                    .build())
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .take(2);

    TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
    testItemFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertValue(Optional.<TestItem>empty());
    testSubscriber.assertNoErrors();

    contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();

    List<Optional<TestItem>> emittedItems = testSubscriber.values();
    assertThat(emittedItems.size()).isEqualTo(2);
    assertThat(emittedItems.get(0).isPresent()).isFalse();
    assertThat(expectedItem.equalsWithoutId(emittedItems.get(1).get())).isTrue();
}
 
源代码16 项目: storio   文件: GetOperationTest.java
@Test
public void getListOfObjectsExecuteAsBlocking() {
    final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(1)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    List<TestItem> list = storIOContentResolver
            .get()
            .listOfObjects(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .build())
            .prepare()
            .executeAsBlocking();

    assertThat(list).hasSize(1);

    assertThat(testItemToInsert.equalsWithoutId(list.get(0))).isTrue();

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码17 项目: storio   文件: DeleteOperationTest.java
@Test
public void deleteObjectsAsCompletable() {
    TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(2)
            .subscribe(changesTestSubscriber);

    TestItem testItemToInsert = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, testItemToInsert.toContentValues());

    Cursor firstDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(firstDbState).hasCount(1);

    //noinspection ConstantConditions
    assertThat(firstDbState.moveToFirst()).isTrue();

    TestItem testItem = TestItem.fromCursor(firstDbState);

    storIOContentResolver
            .delete()
            .objects(singletonList(testItem))
            .prepare()
            .asRxCompletable()
            .blockingAwait(15, SECONDS);

    Cursor secondDbState = contentResolver.query(TestItem.CONTENT_URI, null, null, null, null);
    Assertions.assertThat(secondDbState).hasCount(0);

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValues(Changes.newInstance(TestItem.CONTENT_URI), Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码18 项目: storio   文件: PreparedGetObjectTest.java
@SuppressWarnings("unchecked")
@Test
public void shouldThrowExceptionIfNoTypeMappingWasFoundWithoutAccessingDbWithQueryAsFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
    final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);

    when(storIOSQLite.get()).thenReturn(new PreparedGet.Builder(storIOSQLite));
    when(storIOSQLite.lowLevel()).thenReturn(lowLevel);
    when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());

    final TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();

    storIOSQLite
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
            .prepare()
            .asRxFlowable(LATEST)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoValues();
    Throwable error = testSubscriber.errors().get(0);

    assertThat(error)
            .isInstanceOf(StorIOException.class)
            .hasCauseInstanceOf(IllegalStateException.class)
            .hasMessage("Error has occurred during Get operation. query = Query{distinct=false, table='test_table', columns=[], where='', whereArgs=[], groupBy='', having='', orderBy='', limit='', observesTags='[test_tag]'}");

    assertThat(error.getCause())
            .hasMessage("This type does not have type mapping: "
                    + "type = " + TestItem.class + "," +
                    "db was not touched by this operation, please add type mapping for this type");

    verify(storIOSQLite).get();
    verify(storIOSQLite).lowLevel();
    verify(storIOSQLite).defaultRxScheduler();
    verify(storIOSQLite).interceptors();
    verify(lowLevel).typeMapping(TestItem.class);
    verify(lowLevel, never()).query(any(Query.class));
    verify(storIOSQLite).observeChanges(LATEST);
    verifyNoMoreInteractions(storIOSQLite, lowLevel);
}
 
@Test
public void verifyBehaviorInCaseOfExceptionWithoutTransactionFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
    final StorIOSQLite.LowLevel lowLevel = mock(StorIOSQLite.LowLevel.class);

    //noinspection unchecked
    final PutResolver<ContentValues> putResolver = mock(PutResolver.class);

    final List<ContentValues> contentValues = singletonList(mock(ContentValues.class));

    when(putResolver.performPut(same(storIOSQLite), any(ContentValues.class)))
            .thenThrow(new IllegalStateException("test exception"));

    final TestSubscriber<PutResults<ContentValues>> testSubscriber = new TestSubscriber<PutResults<ContentValues>>();

    new PreparedPutContentValuesIterable.Builder(storIOSQLite, contentValues)
            .withPutResolver(putResolver)
            .useTransaction(false)
            .prepare()
            .asRxFlowable(MISSING)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertNoValues();
    testSubscriber.assertError(StorIOException.class);

    //noinspection ThrowableResultOfMethodCallIgnored
    StorIOException expected = (StorIOException) testSubscriber.errors().get(0);

    IllegalStateException cause = (IllegalStateException) expected.getCause();
    assertThat(cause).hasMessage("test exception");

    // Main check of this test
    verify(lowLevel, never()).endTransaction();

    verify(storIOSQLite).lowLevel();
    verify(storIOSQLite).defaultRxScheduler();
    verify(storIOSQLite).interceptors();
    verify(putResolver).performPut(same(storIOSQLite), any(ContentValues.class));
    verifyNoMoreInteractions(storIOSQLite, lowLevel, putResolver);
}
 
源代码20 项目: storio   文件: PreparedGetObjectTest.java
@Test
public void cursorMustBeClosedInCaseOfExceptionForFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);

    when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());

    //noinspection unchecked
    final GetResolver<Object> getResolver = mock(GetResolver.class);

    final Cursor cursor = mock(Cursor.class);

    when(cursor.getCount()).thenReturn(10);

    when(cursor.moveToNext()).thenReturn(true);

    when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
            .thenReturn(cursor);

    when(getResolver.mapFromCursor(storIOSQLite, cursor))
            .thenThrow(new IllegalStateException("test exception"));

    PreparedGetObject<Object> preparedGetObject =
            new PreparedGetObject<Object>(
                    storIOSQLite,
                    Object.class,
                    Query.builder().table("test_table").observesTags("test_tag").build(),
                    getResolver
            );

    final TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();

    preparedGetObject
            .asRxFlowable(LATEST)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();

    testSubscriber.assertNoValues();
    testSubscriber.assertError(StorIOException.class);

    StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);

    IllegalStateException cause = (IllegalStateException) storIOException.getCause();
    assertThat(cause).hasMessage("test exception");

    // Cursor must be closed in case of exception
    verify(cursor).close();

    verify(storIOSQLite).observeChanges(LATEST);
    verify(getResolver).performGet(eq(storIOSQLite), any(Query.class));
    verify(getResolver).mapFromCursor(storIOSQLite, cursor);
    verify(cursor).getCount();
    verify(cursor).moveToNext();
    verify(storIOSQLite).defaultRxScheduler();
    verify(storIOSQLite).interceptors();

    verifyNoMoreInteractions(storIOSQLite, getResolver, cursor);
}