下面列出了io.reactivex.processors.PublishProcessor#onComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestSubscriber<Integer> ts = Flowable.zip(pp1.last(1).toFlowable(), pp2.last(2).toFlowable(), (a, b) -> a + b)
.test();
pp1.onNext(3);
pp1.onComplete();
pp2.onComplete();
ts.assertResult(5);
}
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
}
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("Start")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
}
@Test
public void test() {
PublishProcessor<Flowable<String>> flush = PublishProcessor.create();
PublishProcessor<String> restProcessor = PublishProcessor.create();
int count = 2;
int delay = 100;
TestScheduler scheduler = new TestScheduler();
Flowable<Flowable<String>> boundary = restProcessor.window(
delay, TimeUnit.MILLISECONDS, scheduler, count, true)
.mergeWith(flush);
PublishProcessor<Flowable<String>> tempBoundary = PublishProcessor.create();
restProcessor
.window(tempBoundary)
.concatMapSingle(Flowable::toList)
.filter(it -> !it.isEmpty())
.subscribe(strings -> System.out.println("emit REST call >>> " + strings));
boundary.subscribe(tempBoundary);
restProcessor.offer("1");
restProcessor.offer("2");
restProcessor.offer("3");
restProcessor.offer("4");
restProcessor.offer("5");
restProcessor.offer("6");
restProcessor.offer("7");
restProcessor.offer("8");
restProcessor.offer("9");
flush.offer(Flowable.just("flush"));
restProcessor.offer("10");
System.out.println("onComplete");
restProcessor.onComplete();
flush.onComplete();
}