io.reactivex.processors.PublishProcessor#onNext ( )源码实例Demo

下面列出了io.reactivex.processors.PublishProcessor#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void convertsFromPublisherSubscribeWithDelay() {
    PublishProcessor<String> processor = PublishProcessor.create();
    processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    liveData.removeObserver(mObserver);
    sBackgroundScheduler.triggerActions();
    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("bar");
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
}
 
@Test
public void convertsFromPublisherWithMultipleObservers() {
    final List<String> output2 = new ArrayList<>();
    PublishProcessor<String> processor = PublishProcessor.create();
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    processor.onNext("bar");

    // The second observer should only get the newest value and any later values.
    liveData.observe(mLifecycleOwner, new Observer<String>() {
        @Override
        public void onChanged(@Nullable String s) {
            output2.add(s);
        }
    });

    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
    assertThat(output2, is(Arrays.asList("bar", "baz")));
}
 
@Test
public void convertsFromPublisherWithMultipleObserversAfterInactive() {
    final List<String> output2 = new ArrayList<>();
    PublishProcessor<String> processor = PublishProcessor.create();
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    processor.onNext("bar");

    // The second observer should only get the newest value and any later values.
    liveData.observe(mLifecycleOwner, new Observer<String>() {
        @Override
        public void onChanged(@Nullable String s) {
            output2.add(s);
        }
    });

    liveData.removeObserver(mObserver);
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar")));
    assertThat(output2, is(Arrays.asList("bar", "baz")));
}
 
源代码4 项目: akarnokd-misc   文件: FlatMapWithTwoErrors.java
@Test
public void innerCancelled() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();
    
    pp1
    .flatMap(v -> pp2)
    .test();

    pp1.onNext(1);
    assertTrue("No subscribers?", pp2.hasSubscribers());

    pp1.onError(new Exception());
    
    assertFalse("Has subscribers?", pp2.hasSubscribers());
}
 
源代码5 项目: akarnokd-misc   文件: FlatMapWithTwoErrors.java
@Test
public void innerCancelled2() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();
    
    pp1
    .concatMap(v -> pp2)
    .test();

    pp1.onNext(1);
    assertTrue("No subscribers?", pp2.hasSubscribers());

    pp1.onError(new Exception());
    
    assertFalse("Has subscribers?", pp2.hasSubscribers());
}
 
@Test
public void convertsFromPublisher() {
    PublishProcessor<String> processor = PublishProcessor.create();
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);

    processor.onNext("foo");
    processor.onNext("bar");
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
}
 
@Test
public void convertsFromPublisherAfterInactive() {
    PublishProcessor<String> processor = PublishProcessor.create();
    LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);

    liveData.observe(mLifecycleOwner, mObserver);
    processor.onNext("foo");
    liveData.removeObserver(mObserver);
    processor.onNext("bar");

    liveData.observe(mLifecycleOwner, mObserver);
    processor.onNext("baz");

    assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz")));
}
 
源代码8 项目: akarnokd-misc   文件: LastZipped.java
@Test
public void test() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();

    TestSubscriber<Integer> ts = Flowable.zip(pp1.last(1).toFlowable(), pp2.last(2).toFlowable(), (a, b) -> a + b)
    .test();

    pp1.onNext(3);
    pp1.onComplete();
    pp2.onComplete();

    ts.assertResult(5);
}
 
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                        Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
    
}
 
源代码10 项目: akarnokd-misc   文件: BufferStartEndTest.java
@Test
public void test() {
    TestScheduler scheduler = new TestScheduler();
    PublishProcessor<String> pp = PublishProcessor.create();

    Function<Flowable<String>, Flowable<List<String>>> f = o -> 
            o.buffer(o.filter(v -> v.contains("Start")), 
                     v -> Flowable.merge(o.filter(w -> w.contains("Start")), 
                                         Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

    pp.publish(f)
    .doOnNext(v -> {
        int s = v.size();
        if (s > 1 && v.get(s - 1).contains("Start")) {
            v.remove(s - 1);
        }
    })
    .subscribe(System.out::println);

    pp.onNext("Start");
    pp.onNext("A");
    pp.onNext("B");
    pp.onNext("End");

    pp.onNext("Start");
    pp.onNext("C");

    scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

    pp.onNext("Start");
    pp.onNext("D");
    pp.onNext("End");
    pp.onComplete();
}
 
源代码11 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldAssertThatNoExpectedValuesLeft() {
    Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    Disposable disposable = emissionChecker.subscribe();

    publishProcessor.onNext("1");

    // "1"
    emissionChecker.awaitNextExpectedValue();

    publishProcessor.onNext("2");

    // "2"
    emissionChecker.awaitNextExpectedValue();

    publishProcessor.onNext("3");

    // "3"
    emissionChecker.awaitNextExpectedValue();

    // Should not throw exception
    emissionChecker.assertThatNoExpectedValuesLeft();

    disposable.dispose();
}
 
源代码12 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldStoreItemsInQueueAndThenAwaitNextExpectedValues() {
    final Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    final Disposable disposable = emissionChecker.subscribe();

    // Notice: We emit several values before awaiting any of them

    publishProcessor.onNext("1");
    publishProcessor.onNext("2");
    publishProcessor.onNext("3");

    // Now we should successfully await all these items one by one
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();

    emissionChecker.assertThatNoExpectedValuesLeft();

    disposable.dispose();
}
 
源代码13 项目: storio   文件: AbstractEmissionCheckerTest.java
@Test
public void shouldThrowExceptionBecauseFlowableEmittedUnexpectedItemAfterExpectedSequence() {
    List<Throwable> errors = TestHelper.trackPluginErrors();

    final Queue<String> expectedValues = new LinkedList<String>();

    expectedValues.add("1");
    expectedValues.add("2");
    expectedValues.add("3");

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
        @NonNull
        @Override
        public Disposable subscribe() {
            return publishProcessor
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            onNextObtained(s);
                        }
                    });
        }
    };

    final Disposable disposable = emissionChecker.subscribe();

    publishProcessor.onNext("1");
    publishProcessor.onNext("2");
    publishProcessor.onNext("3");

    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();
    emissionChecker.awaitNextExpectedValue();

    emissionChecker.assertThatNoExpectedValuesLeft();

    assertThat(errors).isEmpty();

    publishProcessor.onNext("4");

    assertThat(errors).hasSize(1);
    assertThat(errors.get(0).getCause())
            .hasMessage("Received emission, but no more emissions were expected: obtained 4, expectedValues = [], obtainedValues = []");

    disposable.dispose();
}