io.reactivex.processors.PublishProcessor#onComplete ( )源码实例Demo

下面列出了io.reactivex.processors.PublishProcessor#onComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: akarnokd-misc   文件: LastZipped.java
@Test
public void test() {
    PublishProcessor<Integer> pp1 = PublishProcessor.create();
    PublishProcessor<Integer> pp2 = PublishProcessor.create();

    TestSubscriber<Integer> ts = Flowable.zip(pp1.last(1).toFlowable(), pp2.last(2).toFlowable(), (a, b) -> a + b)
    .test();

    pp1.onNext(3);
    pp1.onComplete();
    pp2.onComplete();

    ts.assertResult(5);
}
 
@Test
public void test() {
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                        Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
    
}
 
源代码3 项目: akarnokd-misc   文件: BufferStartEndTest.java
@Test
public void test() {
    TestScheduler scheduler = new TestScheduler();
    PublishProcessor<String> pp = PublishProcessor.create();

    Function<Flowable<String>, Flowable<List<String>>> f = o -> 
            o.buffer(o.filter(v -> v.contains("Start")), 
                     v -> Flowable.merge(o.filter(w -> w.contains("Start")), 
                                         Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

    pp.publish(f)
    .doOnNext(v -> {
        int s = v.size();
        if (s > 1 && v.get(s - 1).contains("Start")) {
            v.remove(s - 1);
        }
    })
    .subscribe(System.out::println);

    pp.onNext("Start");
    pp.onNext("A");
    pp.onNext("B");
    pp.onNext("End");

    pp.onNext("Start");
    pp.onNext("C");

    scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

    pp.onNext("Start");
    pp.onNext("D");
    pp.onNext("End");
    pp.onComplete();
}
 
源代码4 项目: akarnokd-misc   文件: WindowBoundaryIssue.java
@Test
    public void test() {
PublishProcessor<Flowable<String>> flush = PublishProcessor.create();
PublishProcessor<String> restProcessor = PublishProcessor.create();

int count = 2;
int delay = 100;

TestScheduler scheduler = new TestScheduler();

Flowable<Flowable<String>> boundary = restProcessor.window(
        delay, TimeUnit.MILLISECONDS, scheduler, count, true)
                .mergeWith(flush);

PublishProcessor<Flowable<String>> tempBoundary = PublishProcessor.create();

restProcessor
                .window(tempBoundary)
                .concatMapSingle(Flowable::toList)
                .filter(it -> !it.isEmpty())
                .subscribe(strings -> System.out.println("emit REST call >>> " + strings));

boundary.subscribe(tempBoundary);

restProcessor.offer("1");
restProcessor.offer("2");

restProcessor.offer("3");
restProcessor.offer("4");

restProcessor.offer("5");
restProcessor.offer("6");

restProcessor.offer("7");
restProcessor.offer("8");

restProcessor.offer("9");
flush.offer(Flowable.just("flush"));

restProcessor.offer("10");

System.out.println("onComplete");

restProcessor.onComplete();
flush.onComplete();
    }