io.reactivex.subscribers.TestSubscriber#assertNotTerminated ( )源码实例Demo

下面列出了io.reactivex.subscribers.TestSubscriber#assertNotTerminated ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationBetweenRequestAndValue() {
    // TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
    executor = Executors.newSingleThreadExecutor();
    Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return x;
    }).convert().toPublisher();

    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.request(1);
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
源代码2 项目: RxShell   文件: HarvesterTest.java
@Test
public void testBuffers_output() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);
    when(cmd.isOutputBufferEnabled()).thenReturn(true);

    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext("some-output");
    publisher.onNext(uuid + " 255");

    testSubscriber.assertValueCount(1).assertComplete();

    Harvester.Crop crop = testSubscriber.values().get(0);
    assertThat(crop.buffer.size(), is(1));
    assertThat(crop.buffer, Matchers.contains("some-output"));
}
 
源代码3 项目: RxShell   文件: HarvesterTest.java
@Test
public void testBuffers_error() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);
    when(cmd.isErrorBufferEnabled()).thenReturn(true);

    TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext("some-errors");
    publisher.onNext(uuid + " 255");

    testSubscriber.assertValueCount(1).assertComplete();

    Harvester.Crop crop = testSubscriber.values().get(0);
    assertThat(crop.buffer.size(), is(1));
    assertThat(crop.buffer, Matchers.contains("some-errors"));
}
 
源代码4 项目: RxShell   文件: HarvesterTest.java
@Test
public void testUpstreamPrematureCompletion_output() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);
    when(cmd.isOutputBufferEnabled()).thenReturn(true);

    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext("some-output");
    publisher.onComplete();

    OutputHarvester.Crop crop = testSubscriber.assertValueCount(1).assertComplete().values().get(0);
    assertThat(crop.isComplete, is(false));
    assertThat(crop.exitCode, is(Cmd.ExitCode.INITIAL));
    assertThat(crop.buffer.size(), is(1));
    assertThat(crop.buffer, contains("some-output"));
}
 
源代码5 项目: storio   文件: RxChangesBusTest.java
@Test
public void onNextShouldSendMessagesToObserver() {
    RxChangesBus<String> rxChangesBus = new RxChangesBus<String>();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

    rxChangesBus
            .asFlowable()
            .subscribe(testSubscriber);

    List<String> messages = asList("yo", ",", "wanna", "some", "messages?");

    for (String message : messages) {
        rxChangesBus.onNext(message);
    }

    testSubscriber.assertValueSequence(messages);
    testSubscriber.assertNotTerminated();
}
 
源代码6 项目: storio   文件: ChangesBusTest.java
@Test
public void onNextShouldSendMessagesToObserverIfRxJavaIsInTheClassPath() {
    ChangesBus<String> changesBus = new ChangesBus<String>(true);

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

    Flowable<String> flowable = changesBus.asFlowable();
    assertThat(flowable).isNotNull();

    //noinspection ConstantConditions
    flowable.subscribe(testSubscriber);

    List<String> messages = asList("My", "life", "my", "rules", "please?");

    for (String message: messages) {
        changesBus.onNext(message);
    }

    testSubscriber.assertValueSequence(messages);
    testSubscriber.assertNotTerminated();
}
 
源代码7 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testWithImmediateValueWithOneRequestAndImmediateCancellation() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(1, true);
    test.assertSubscribed();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
源代码8 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testCancellationBetweenSubscriptionAndRequest() {
    Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
    assertThat(publisher).isNotNull();
    TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
    test.assertSubscribed();
    test.cancel();
    assertThat(test.isCancelled()).isTrue();
    test.assertNotTerminated();
    test.assertNever(1);
}
 
源代码9 项目: RxShell   文件: HarvesterTest.java
@Test
public void testCommandCompletion_output() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);

    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext(uuid + " 255");

    testSubscriber.assertValueCount(1).assertComplete();

    OutputHarvester.Crop crop = testSubscriber.values().get(0);
    assertThat(crop.exitCode, is(255));
}
 
源代码10 项目: RxShell   文件: HarvesterTest.java
@Test
public void testDownstreamCancel_output() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.doOnCancel(latch::countDown).compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    testSubscriber.dispose();

    assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
 
源代码11 项目: RxShell   文件: HarvesterTest.java
@Test
public void testDownstreamCancel_errors() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.doOnCancel(latch::countDown).compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    testSubscriber.dispose();

    assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
 
源代码12 项目: RxShell   文件: HarvesterTest.java
@Test
public void testBadMarker_output() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);

    TestSubscriber<OutputHarvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext(uuid + " &/()");

    testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout();
    OutputHarvester.Crop crop = testSubscriber.values().get(0);
    assertThat(crop.exitCode, is(Cmd.ExitCode.EXCEPTION));
}
 
源代码13 项目: RxShell   文件: HarvesterTest.java
@Test
public void testBadMarker_errors() {
    String uuid = UUID.randomUUID().toString();
    when(cmd.getMarker()).thenReturn(uuid);

    TestSubscriber<Harvester.Crop> testSubscriber = publisher.compose(harvesterFactory.forError(publisher, cmd)).test();
    testSubscriber.assertNotTerminated();

    publisher.onNext(uuid + " §$%&");

    testSubscriber.awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(1);
}
 
@Test
public void testCancelBeforeRequest() {
    Single<Integer> s = Single.fromCallable(() -> {
        return 1;
    });
    TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
            .test(0);
    ts.cancel();
    ts.assertNoValues();
    ts.assertNotTerminated();
    ts.cancel();
}
 
源代码15 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelled() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(2) //
            .assertValues("boo", "and").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(2);
    ts.assertNotTerminated();
}
 
源代码16 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelledEarly() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(1) //
            .assertValues("boo").assertNotTerminated();
    ts.cancel();
    ts.assertValueCount(1);
    ts.assertNotTerminated();
}
 
源代码17 项目: rxjava2-extras   文件: StringsSplitTest.java
@Test
public void testSplitSimpleNormalCancelledAtBeginning() {
    TestSubscriber<String> ts = Flowable.just("boo:an", "d:you") //
            .compose(Strings.splitSimple(":")) //
            .test(0) //
            .assertNoValues() //
            .assertNotTerminated();
    ts.cancel();
    ts.requestMore(1);
    ts.assertNoValues();
    ts.assertNotTerminated();
}
 
源代码18 项目: rxjava2-extras   文件: FlowableMaxRequestTest.java
@Test
public void checkCancel() {
    List<Long> requests = new CopyOnWriteArrayList<Long>();
    TestSubscriber<Integer> ts = Flowable.range(1, 10) //
            .doOnRequest(Consumers.addLongTo(requests)) //
            .compose(Transformers.<Integer>maxRequest(3)) //
            .test(4).assertValues(1, 2, 3, 4); //
    ts.cancel();
    ts.requestMore(3);
    ts.assertValueCount(4);
    ts.assertNotTerminated();
    assertEquals(Arrays.asList(3L, 1L), requests);
}
 
源代码19 项目: storio   文件: RxChangesObserverTest.java
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionLowerThan16() {
    for (int sdkVersion = MIN_SDK_VERSION; sdkVersion < 16; sdkVersion++) {
        ContentResolver contentResolver = mock(ContentResolver.class);
        final Map<Uri, ContentObserver> contentObservers = new HashMap<Uri, ContentObserver>(3);

        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocation) throws Throwable {
                contentObservers.put((Uri) invocation.getArguments()[0], (ContentObserver) invocation.getArguments()[2]);
                return null;
            }
        }).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));

        TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

        Uri uri1 = mock(Uri.class);
        Uri uri2 = mock(Uri.class);
        Set<Uri> uris = new HashSet<Uri>(2);
        uris.add(uri1);
        uris.add(uri2);

        RxChangesObserver
                .observeChanges(
                        contentResolver,
                        uris,
                        mock(Handler.class),
                        sdkVersion,
                        BackpressureStrategy.MISSING
                )
                .subscribe(testSubscriber);

        testSubscriber.assertNotTerminated();
        testSubscriber.assertNoValues();

        // Emulate change of Uris, Flowable should react and emit Changes objects
        contentObservers.get(uri1).onChange(false);
        contentObservers.get(uri2).onChange(false);
        testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));

        testSubscriber.dispose();
        testSubscriber.assertNoErrors();
    }
}
 
源代码20 项目: storio   文件: RxChangesObserverTest.java
@TargetApi(Build.VERSION_CODES.JELLY_BEAN)
@Test
public void shouldEmitChangesOnSdkVersionGreaterThan15() {
    for (int sdkVersion = 16; sdkVersion < MAX_SDK_VERSION; sdkVersion++) {
        ContentResolver contentResolver = mock(ContentResolver.class);
        final AtomicReference<ContentObserver> contentObserver = new AtomicReference<ContentObserver>();

        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocation) throws Throwable {
                // Save reference to ContentObserver only once to assert that it was created once
                if (contentObserver.get() == null) {
                    contentObserver.set((ContentObserver) invocation.getArguments()[2]);
                } else if (contentObserver.get() != invocation.getArguments()[2]) {
                    throw new AssertionError("More than one ContentObserver was created");
                }
                return null;
            }
        }).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));

        TestSubscriber<Changes> testSubscriber = new TestSubscriber<Changes>();

        Uri uri1 = mock(Uri.class);
        Uri uri2 = mock(Uri.class);
        Set<Uri> uris = new HashSet<Uri>(2);
        uris.add(uri1);
        uris.add(uri2);

        RxChangesObserver
                .observeChanges(
                        contentResolver,
                        uris,
                        mock(Handler.class),
                        sdkVersion,
                        BackpressureStrategy.MISSING
                )
                .subscribe(testSubscriber);

        testSubscriber.assertNotTerminated();
        testSubscriber.assertNoValues();

        // RxChangesObserver should ignore call to onChange() without Uri on sdkVersion >= 16
        contentObserver.get().onChange(false);
        testSubscriber.assertNoValues();

        // Emulate change of Uris, Flowable should react and emit Changes objects
        contentObserver.get().onChange(false, uri1);
        contentObserver.get().onChange(false, uri2);

        testSubscriber.assertValues(Changes.newInstance(uri1), Changes.newInstance(uri2));

        testSubscriber.dispose();
        testSubscriber.assertNoErrors();
    }
}