下面列出了io.reactivex.processors.PublishProcessor#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void convertsFromPublisherSubscribeWithDelay() {
PublishProcessor<String> processor = PublishProcessor.create();
processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
liveData.removeObserver(mObserver);
sBackgroundScheduler.triggerActions();
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
}
@Test
public void convertsFromPublisherWithMultipleObservers() {
final List<String> output2 = new ArrayList<>();
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
// The second observer should only get the newest value and any later values.
liveData.observe(mLifecycleOwner, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
output2.add(s);
}
});
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
assertThat(output2, is(Arrays.asList("bar", "baz")));
}
@Test
public void convertsFromPublisherWithMultipleObserversAfterInactive() {
final List<String> output2 = new ArrayList<>();
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
// The second observer should only get the newest value and any later values.
liveData.observe(mLifecycleOwner, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
output2.add(s);
}
});
liveData.removeObserver(mObserver);
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar")));
assertThat(output2, is(Arrays.asList("bar", "baz")));
}
@Test
public void innerCancelled() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
pp1
.flatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasSubscribers());
}
@Test
public void innerCancelled2() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
pp1
.concatMap(v -> pp2)
.test();
pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());
pp1.onError(new Exception());
assertFalse("Has subscribers?", pp2.hasSubscribers());
}
@Test
public void convertsFromPublisher() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
}
@Test
public void convertsFromPublisherAfterInactive() {
PublishProcessor<String> processor = PublishProcessor.create();
LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("foo");
liveData.removeObserver(mObserver);
processor.onNext("bar");
liveData.observe(mLifecycleOwner, mObserver);
processor.onNext("baz");
assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz")));
}
@Test
public void test() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestSubscriber<Integer> ts = Flowable.zip(pp1.last(1).toFlowable(), pp2.last(2).toFlowable(), (a, b) -> a + b)
.test();
pp1.onNext(3);
pp1.onComplete();
pp2.onComplete();
ts.assertResult(5);
}
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
}
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("Start")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
}
@Test
public void shouldAssertThatNoExpectedValuesLeft() {
Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
Disposable disposable = emissionChecker.subscribe();
publishProcessor.onNext("1");
// "1"
emissionChecker.awaitNextExpectedValue();
publishProcessor.onNext("2");
// "2"
emissionChecker.awaitNextExpectedValue();
publishProcessor.onNext("3");
// "3"
emissionChecker.awaitNextExpectedValue();
// Should not throw exception
emissionChecker.assertThatNoExpectedValuesLeft();
disposable.dispose();
}
@Test
public void shouldStoreItemsInQueueAndThenAwaitNextExpectedValues() {
final Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
final Disposable disposable = emissionChecker.subscribe();
// Notice: We emit several values before awaiting any of them
publishProcessor.onNext("1");
publishProcessor.onNext("2");
publishProcessor.onNext("3");
// Now we should successfully await all these items one by one
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.assertThatNoExpectedValuesLeft();
disposable.dispose();
}
@Test
public void shouldThrowExceptionBecauseFlowableEmittedUnexpectedItemAfterExpectedSequence() {
List<Throwable> errors = TestHelper.trackPluginErrors();
final Queue<String> expectedValues = new LinkedList<String>();
expectedValues.add("1");
expectedValues.add("2");
expectedValues.add("3");
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
final AbstractEmissionChecker<String> emissionChecker = new AbstractEmissionChecker<String>(expectedValues) {
@NonNull
@Override
public Disposable subscribe() {
return publishProcessor
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
onNextObtained(s);
}
});
}
};
final Disposable disposable = emissionChecker.subscribe();
publishProcessor.onNext("1");
publishProcessor.onNext("2");
publishProcessor.onNext("3");
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.awaitNextExpectedValue();
emissionChecker.assertThatNoExpectedValuesLeft();
assertThat(errors).isEmpty();
publishProcessor.onNext("4");
assertThat(errors).hasSize(1);
assertThat(errors.get(0).getCause())
.hasMessage("Received emission, but no more emissions were expected: obtained 4, expectedValues = [], obtainedValues = []");
disposable.dispose();
}