下面列出了io.reactivex.subscribers.TestSubscriber#assertNotTerminated ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newSingleThreadExecutor();
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x;
}).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(1);
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testBuffers_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
when(cmd.isOutputBufferEnabled()).thenReturn(true);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext("some-output");
publisher.onNext(uuid + " 255");
testSubscriber.assertValueCount(1).assertComplete();
Harvester.Crop crop = testSubscriber.values().get(0);
assertThat(crop.buffer.size(), is(1));
assertThat(crop.buffer, Matchers.contains("some-output"));
}
@Test
public void testBuffers_error() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
when(cmd.isErrorBufferEnabled()).thenReturn(true);
TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext("some-errors");
publisher.onNext(uuid + " 255");
testSubscriber.assertValueCount(1).assertComplete();
Harvester.Crop crop = testSubscriber.values().get(0);
assertThat(crop.buffer.size(), is(1));
assertThat(crop.buffer, Matchers.contains("some-errors"));
}
@Test
public void testUpstreamPrematureCompletion_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
when(cmd.isOutputBufferEnabled()).thenReturn(true);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext("some-output");
publisher.onComplete();
OutputHarvester.Crop crop = testSubscriber.assertValueCount(1).assertComplete().values().get(0);
assertThat(crop.isComplete, is(false));
assertThat(crop.exitCode, is(Cmd.ExitCode.INITIAL));
assertThat(crop.buffer.size(), is(1));
assertThat(crop.buffer, contains("some-output"));
}
@Test
public void onNextShouldSendMessagesToObserver() {
RxChangesBus<String> rxChangesBus = new RxChangesBus<String>();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
rxChangesBus
.asFlowable()
.subscribe(testSubscriber);
List<String> messages = asList("yo", ",", "wanna", "some", "messages?");
for (String message : messages) {
rxChangesBus.onNext(message);
}
testSubscriber.assertValueSequence(messages);
testSubscriber.assertNotTerminated();
}
@Test
public void onNextShouldSendMessagesToObserverIfRxJavaIsInTheClassPath() {
ChangesBus<String> changesBus = new ChangesBus<String>(true);
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
Flowable<String> flowable = changesBus.asFlowable();
assertThat(flowable).isNotNull();
//noinspection ConstantConditions
flowable.subscribe(testSubscriber);
List<String> messages = asList("My", "life", "my", "rules", "please?");
for (String message: messages) {
changesBus.onNext(message);
}
testSubscriber.assertValueSequence(messages);
testSubscriber.assertNotTerminated();
}
@Test
public void testWithImmediateValueWithOneRequestAndImmediateCancellation() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(1, true);
test.assertSubscribed();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCancellationBetweenSubscriptionAndRequest() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.cancel();
assertThat(test.isCancelled()).isTrue();
test.assertNotTerminated();
test.assertNever(1);
}
@Test
public void testCommandCompletion_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext(uuid + " 255");
testSubscriber.assertValueCount(1).assertComplete();
OutputHarvester.Crop crop = testSubscriber.values().get(0);
assertThat(crop.exitCode, is(255));
}
@Test
public void testDownstreamCancel_output() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.doOnCancel(latch::countDown).compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
testSubscriber.dispose();
assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
@Test
public void testDownstreamCancel_errors() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.doOnCancel(latch::countDown).compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
testSubscriber.dispose();
assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
@Test
public void testBadMarker_output() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext(uuid + " &/()");
testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout();
OutputHarvester.Crop crop = testSubscriber.values().get(0);
assertThat(crop.exitCode, is(Cmd.ExitCode.EXCEPTION));
}
@Test
public void testBadMarker_errors() {
String uuid = UUID.randomUUID().toString();
when(cmd.getMarker()).thenReturn(uuid);
TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
testSubscriber.assertNotTerminated();
publisher.onNext(uuid + " §$%&");
testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1);
}
@Test
public void testCancelBeforeRequest() {
Single<Integer> s = Single.fromCallable(() -> {
return 1;
});
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
.test(0);
ts.cancel();
ts.assertNoValues();
ts.assertNotTerminated();
ts.cancel();
}
@Test
public void testSplitSimpleNormalCancelled() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(2) //
.assertValues("boo", "and").assertNotTerminated();
ts.cancel();
ts.assertValueCount(2);
ts.assertNotTerminated();
}
@Test
public void testSplitSimpleNormalCancelledEarly() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(1) //
.assertValues("boo").assertNotTerminated();
ts.cancel();
ts.assertValueCount(1);
ts.assertNotTerminated();
}
@Test
public void testSplitSimpleNormalCancelledAtBeginning() {
TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
.compose(Strings.splitSimple(":")) //
.test(0) //
.assertNoValues() //
.assertNotTerminated();
ts.cancel();
ts.requestMore(1);
ts.assertNoValues();
ts.assertNotTerminated();
}
@Test
public void checkCancel() {
List<Long> requests = new CopyOnWriteArrayList<Long>();
TestSubscriber<Integer> ts = Flowable.range(1, 10) //
.doOnRequest(Consumers.addLongTo(requests)) //
.compose(Transformers.<Integer>maxRequest(3)) //
.test(4).assertValues(1, 2, 3, 4); //
ts.cancel();
ts.requestMore(3);
ts.assertValueCount(4);
ts.assertNotTerminated();
assertEquals(Arrays.asList(3L, 1L), requests);
}
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionLowerThan16() {
for (int sdkVersion = MIN_SDK_VERSION; sdkVersion < 16; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final Map<Uri, ContentObserver> contentObservers = new HashMap<Uri, ContentObserver>(3);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
contentObservers.put((Uri) invocation.getArguments()[0], (ContentObserver) invocation.getArguments()[2]);
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Uri uri1 = mock(Uri.class);
Uri uri2 = mock(Uri.class);
Set<Uri> uris = new HashSet<Uri>(2);
uris.add(uri1);
uris.add(uri2);
RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
)
.subscribe(testSubscriber);
testSubscriber.assertNotTerminated();
testSubscriber.assertNoValues();
// Emulate change of Uris, Flowable should react and emit Changes objects
contentObservers.get(uri1).onChange(false);
contentObservers.get(uri2).onChange(false);
testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));
testSubscriber.dispose();
testSubscriber.assertNoErrors();
}
}
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionGreaterThan15() {
for (int sdkVersion = 16; sdkVersion < MAX_SDK_VERSION; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final AtomicReference<ContentObserver> contentObserver = new AtomicReference<ContentObserver>();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
// Save reference to ContentObserver only once to assert that it was created once
if (contentObserver.get() == null) {
contentObserver.set((ContentObserver) invocation.getArguments()[2]);
} else if (contentObserver.get() != invocation.getArguments()[2]) {
throw new AssertionError("More than one ContentObserver was created");
}
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();
Uri uri1 = mock(Uri.class);
Uri uri2 = mock(Uri.class);
Set<Uri> uris = new HashSet<Uri>(2);
uris.add(uri1);
uris.add(uri2);
RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
)
.subscribe(testSubscriber);
testSubscriber.assertNotTerminated();
testSubscriber.assertNoValues();
// RxChangesObserver should ignore call to onChange() without Uri on sdkVersion >= 16
contentObserver.get().onChange(false);
testSubscriber.assertNoValues();
// Emulate change of Uris, Flowable should react and emit Changes objects
contentObserver.get().onChange(false, uri1);
contentObserver.get().onChange(false, uri2);
testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));
testSubscriber.dispose();
testSubscriber.assertNoErrors();
}
}