io.reactivex.Flowable#zip ( )源码实例Demo

下面列出了io.reactivex.Flowable#zip ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public void getSectionCompletion(@Nullable String sectionUid) {
    Flowable<List<FieldViewModel>> fieldsFlowable = eventSummaryRepository.list(sectionUid, eventUid);

    Flowable<Result<RuleEffect>> ruleEffectFlowable = eventSummaryRepository.calculate().subscribeOn(schedulerProvider.computation())
            .onErrorReturn(throwable -> Result.failure(new Exception(throwable)));

    // Combining results of two repositories into a single stream.
    Flowable<List<FieldViewModel>> viewModelsFlowable = Flowable.zip(fieldsFlowable, ruleEffectFlowable, this::applyEffects);

    compositeDisposable.add(viewModelsFlowable
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(view.showFields(sectionUid), throwable -> {
                throw new OnErrorNotImplementedException(throwable);
            }));
}
 
@Override
public void getSectionCompletion(@Nullable String sectionUid) {
    Flowable<List<FieldViewModel>> fieldsFlowable = eventSummaryRepository.list(sectionUid, eventId);
    Flowable<Result<RuleEffect>> ruleEffectFlowable = eventSummaryRepository.calculate()
            .subscribeOn(schedulerProvider.computation())
            .onErrorReturn(throwable -> Result.failure(new Exception(throwable)));

    // Combining results of two repositories into a single stream.
    Flowable<List<FieldViewModel>> viewModelsFlowable = Flowable.zip(fieldsFlowable, ruleEffectFlowable,
            this::applyEffects);

    compositeDisposable.add(viewModelsFlowable.subscribeOn(schedulerProvider.ui())
            .observeOn(schedulerProvider.ui()).subscribe(view.showFields(sectionUid), throwable -> {
                throw new OnErrorNotImplementedException(throwable);
            }));
}
 
@Override
public void getProgramStages(String programId, @NonNull String uid) {

    Flowable<List<ProgramStage>> stagesFlowable = programStageSelectionRepository.enrollmentProgramStages(programId, uid);
    Flowable<Result<RuleEffect>> ruleEffectFlowable = programStageSelectionRepository.calculate();

    // Combining results of two repositories into a single stream.
    Flowable<List<ProgramStage>> stageModelsFlowable = Flowable.zip(
            stagesFlowable.subscribeOn(schedulerProvider.io()),
            ruleEffectFlowable.subscribeOn(schedulerProvider.io()),
            this::applyEffects);

    compositeDisposable.add(stageModelsFlowable
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.ui())
            .subscribe(
                    view::setData,
                    Timber::e));
}
 
源代码4 项目: akarnokd-misc   文件: ZipInfinite.java
@Test
public void test() {
    final Flowable<Integer> flo1 = Flowable.just(1).repeat();
    final FlowableProcessor<Integer> flo2 = PublishProcessor.create();
    final Flowable<Integer> result = Flowable.zip(flo2, flo1, (o1, o2) -> {
          return 1;
    }, true, 1);
    result.subscribe(e -> System.out.println(e));
    flo2.onNext(2);
    flo2.onNext(3);
}
 
@Outgoing("X")
public Flowable<String> x() {
    return Flowable.zip(Flowable.fromArray("a", "b", "c"),
            Flowable.interval(10, TimeUnit.MILLISECONDS),
            (a, b) -> a);
}
 
@Outgoing("X")
public Flowable<String> x() {
    return Flowable.zip(Flowable.fromArray("a", "b", "c"),
            Flowable.interval(10, TimeUnit.MILLISECONDS),
            (a, b) -> a);
}
 
@Outgoing("X")
public Flowable<String> x() {
    return Flowable.zip(Flowable.fromArray("a", "b", "c"),
            Flowable.interval(10, TimeUnit.MILLISECONDS),
            (a, b) -> a);
}
 
源代码8 项目: akarnokd-misc   文件: ZipPerf.java
@Setup
public void setup() {
    Integer[] array1 = new Integer[firstLen];
    Arrays.fill(array1, 777);
    Integer[] array2 = new Integer[secondLen];
    Arrays.fill(array2, 777);

    baseline = Flowable.fromArray(firstLen < secondLen ? array2 : array1);

    Flowable<Integer> o1 = Flowable.fromArray(array1);

    Flowable<Integer> o2 = Flowable.fromArray(array2);

    BiFunction<Integer, Integer, Integer> plus = (a, b) -> a + b;

    bothSync = Flowable.zip(o1, o2, plus);

    firstSync = Flowable.zip(o1, o2.subscribeOn(Schedulers.computation()), plus);

    secondSync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2, plus);

    bothAsync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2.subscribeOn(Schedulers.computation()), plus);

    small = Math.min(firstLen, secondLen) < 100;
}