下面列出了io.reactivex.subscribers.TestSubscriber#dispose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.test();
Thread.sleep(250);
subscription.dispose();
Thread.sleep(250);
subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
// Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread
assertThat(Math.abs(subscription.valueCount() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3);
assertThat(svc.wasCanceled()).isTrue();
errorRule.verifyNoError();
}
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
TestService svc = new TestService();
serverRule.getServiceRegistry().addService(svc);
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(serverRule.getChannel());
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.take(10)
.test();
// Consume some work
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
subscription.dispose();
subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
subscription.assertValueCount(10);
subscription.assertTerminated();
assertThat(svc.wasCanceled()).isTrue();
errorRule.verifyNoError();
}
@Test
public void applyForTables_shouldFilterRequiredTable() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTables(
Flowable.just(Changes.newInstance("table1"),
Changes.newInstance("table2"),
Changes.newInstance("table3")),
singleton("table2"))
.subscribe(testSubscriber);
// All other tables should be filtered
testSubscriber.assertValue(Changes.newInstance("table2"));
testSubscriber.dispose();
}
@Test
public void applyForTags_shouldFilterRequiredTag() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTags(
Flowable.just(
Changes.newInstance("table1", "tag1"),
Changes.newInstance("table2", "tag2"),
Changes.newInstance("table3")),
singleton("tag1"))
.subscribe(testSubscriber);
// All other tags should be filtered
testSubscriber.assertValue(Changes.newInstance("table1", "tag1"));
testSubscriber.dispose();
}
@Test
public void applyForTablesAndTags_shouldNotifyByTable() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTablesAndTags(
Flowable.just(Changes.newInstance("table1", "another_tag"),
Changes.newInstance("table2", "tag2"),
Changes.newInstance("table3")),
singleton("table1"),
singleton("tag1"))
.subscribe(testSubscriber);
// All other Changes should be filtered
testSubscriber.assertValue(Changes.newInstance("table1", "another_tag"));
testSubscriber.dispose();
}
@Test
public void applyForTablesAndTags_shouldNotifyByTag() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTablesAndTags(
Flowable.just(Changes.newInstance("another_table", "tag1"),
Changes.newInstance("table2", "tag2"),
Changes.newInstance("table3")),
singleton("table1"),
singleton("tag1"))
.subscribe(testSubscriber);
// All other Changes should be filtered
testSubscriber.assertValue(Changes.newInstance("another_table", "tag1"));
testSubscriber.dispose();
}
@Test
public void applyForTablesAndTags_shouldSendJustOnceNotificationIfBothTableAndTagAreSatisfy() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTablesAndTags(
Flowable.just(Changes.newInstance("target_table", "target_tag")),
singleton("target_table"),
singleton("target_tag"))
.subscribe(testSubscriber);
testSubscriber.assertValueCount(1);
testSubscriber.assertValue(Changes.newInstance("target_table", "target_tag"));
testSubscriber.dispose();
}
@Test
public void observeChangesAndNotifyAboutChangesShouldWorkCorrectly() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChanges(LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("test_table", "tag");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValue(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void observeChangesInTables_shouldNotReceiveIfTableWasNotChanged() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Set<String> tables = new HashSet<String>(2);
tables.add("table1");
tables.add("table2");
storIOSQLite
.observeChangesInTables(tables, LATEST)
.subscribe(testSubscriber);
storIOSQLite
.lowLevel()
.notifyAboutChanges(Changes.newInstance("table3"));
testSubscriber.assertNoValues();
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void observeChangesOfTags_shouldReceiveIfObservedTagExistInChanges() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
String tag1 = "tag1";
String tag2 = "tag2";
Set<String> tags = new HashSet<String>(2);
tags.add(tag1);
tags.add(tag2);
storIOSQLite
.observeChangesOfTags(tags, LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes = Changes.newInstance("table1", tag1);
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes);
testSubscriber.assertValues(changes);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void observeChangesOfTags_shouldNotReceiveIfObservedTagDoesNotExistInChanges() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Set<String> tags = new HashSet<String>(2);
tags.add("tag1");
tags.add("tag2");
storIOSQLite
.observeChangesOfTags(tags, LATEST)
.subscribe(testSubscriber);
storIOSQLite
.lowLevel()
.notifyAboutChanges(Changes.newInstance("table3", "tag3"));
testSubscriber.assertNoValues();
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@Test
public void testDownstreamCancel_output() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.doOnCancel(latch::countDown).compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
testSubscriber.dispose();
assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Cursor> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
new PreparedGetCursor.Builder(storIOSQLite)
.withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Integer> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
new PreparedGetNumberOfResults.Builder(storIOSQLite)
.withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
assertThat(testSubscriber.errorCount()).isEqualTo(1);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test
public void applyForTables_shouldFilterRequiredTableWhichIsPartOfSomeChanges() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
ChangesFilter
.applyForTables(
Flowable.just(Changes.newInstance("table1"),
Changes.newInstance(new HashSet<String>() {
{
add("table1");
// Notice, that required table
// Is just a part of one Changes object
add("table2");
add("table3");
}
})),
singleton("table3"))
.subscribe(testSubscriber);
// All other Changes should be filtered
testSubscriber.assertValue(Changes.newInstance(new HashSet<String>() {
{
add("table1");
add("table2");
add("table3");
}
}));
testSubscriber.dispose();
}
@Test
public void applyForTags_shouldFilterRequiredTagWhichIsPartOfSomeChanges() {
final TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Changes changes = Changes.newInstance(
new HashSet<String>() {{
add("table1");
}},
new HashSet<String>() {{
add("tag1");
add("tag2");
}}
);
ChangesFilter
.applyForTags(
Flowable.just(
changes,
Changes.newInstance("table3", "tag3"),
Changes.newInstance("table4")),
singleton("tag1"))
.subscribe(testSubscriber);
// All other tags should be filtered
testSubscriber.assertValue(changes);
testSubscriber.dispose();
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOContentResolver storIOContentResolver = mock(StorIOContentResolver.class);
Uri testUri = mock(Uri.class);
when(storIOContentResolver.observeChangesOfUri(eq(testUri), eq(BackpressureStrategy.MISSING)))
.thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Integer> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOContentResolver), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
new PreparedGetNumberOfResults.Builder(storIOContentResolver)
.withQuery(Query.builder().uri(testUri).build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
assertThat(testSubscriber.errors()).hasSize(1);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test
public void observeChangesInTable() {
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
storIOSQLite
.observeChangesInTable("table1", LATEST)
.subscribe(testSubscriber);
testSubscriber.assertNoValues();
Changes changes1 = Changes.newInstance("table2");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes1);
testSubscriber.assertNoValues();
Changes changes2 = Changes.newInstance("table1");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes2);
testSubscriber.assertValue(changes2);
Changes changes3 = Changes.newInstance("table3");
storIOSQLite
.lowLevel()
.notifyAboutChanges(changes3);
// Subscriber should not see changes of table2 and table3
testSubscriber.assertValue(changes2);
testSubscriber.assertNoErrors();
testSubscriber.dispose();
}
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionGreaterThan15() {
for (int sdkVersion = 16; sdkVersion < MAX_SDK_VERSION; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final AtomicReference<ContentObserver> contentObserver = new AtomicReference<ContentObserver>();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
// Save reference to ContentObserver only once to assert that it was created once
if (contentObserver.get() == null) {
contentObserver.set((ContentObserver) invocation.getArguments()[2]);
} else if (contentObserver.get() != invocation.getArguments()[2]) {
throw new AssertionError("More than one ContentObserver was created");
}
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Uri uri1 = mock(Uri.class);
Uri uri2 = mock(Uri.class);
Set<Uri> uris = new HashSet<Uri>(2);
uris.add(uri1);
uris.add(uri2);
RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
)
.subscribe(testSubscriber);
testSubscriber.assertNotTerminated();
testSubscriber.assertNoValues();
// RxChangesObserver should ignore call to onChange() without Uri on sdkVersion >= 16
contentObserver.get().onChange(false);
testSubscriber.assertNoValues();
// Emulate change of Uris, Flowable should react and emit Changes objects
contentObserver.get().onChange(false, uri1);
contentObserver.get().onChange(false, uri2);
testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));
testSubscriber.dispose();
testSubscriber.assertNoErrors();
}
}
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionLowerThan16() {
for (int sdkVersion = MIN_SDK_VERSION; sdkVersion < 16; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final Map<Uri, ContentObserver> contentObservers = new HashMap<Uri, ContentObserver>(3);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
contentObservers.put((Uri) invocation.getArguments()[0], (ContentObserver) invocation.getArguments()[2]);
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Uri uri1 = mock(Uri.class);
Uri uri2 = mock(Uri.class);
Set<Uri> uris = new HashSet<Uri>(2);
uris.add(uri1);
uris.add(uri2);
RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
)
.subscribe(testSubscriber);
testSubscriber.assertNotTerminated();
testSubscriber.assertNoValues();
// Emulate change of Uris, Flowable should react and emit Changes objects
contentObservers.get(uri1).onChange(false);
contentObservers.get(uri2).onChange(false);
testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));
testSubscriber.dispose();
testSubscriber.assertNoErrors();
}
}