下面列出了io.reactivex.Flowable#zip ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void getSectionCompletion(@Nullable String sectionUid) {
Flowable<List<FieldViewModel>> fieldsFlowable = eventSummaryRepository.list(sectionUid, eventUid);
Flowable<Result<RuleEffect>> ruleEffectFlowable = eventSummaryRepository.calculate().subscribeOn(schedulerProvider.computation())
.onErrorReturn(throwable -> Result.failure(new Exception(throwable)));
// Combining results of two repositories into a single stream.
Flowable<List<FieldViewModel>> viewModelsFlowable = Flowable.zip(fieldsFlowable, ruleEffectFlowable, this::applyEffects);
compositeDisposable.add(viewModelsFlowable
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(view.showFields(sectionUid), throwable -> {
throw new OnErrorNotImplementedException(throwable);
}));
}
@Override
public void getSectionCompletion(@Nullable String sectionUid) {
Flowable<List<FieldViewModel>> fieldsFlowable = eventSummaryRepository.list(sectionUid, eventId);
Flowable<Result<RuleEffect>> ruleEffectFlowable = eventSummaryRepository.calculate()
.subscribeOn(schedulerProvider.computation())
.onErrorReturn(throwable -> Result.failure(new Exception(throwable)));
// Combining results of two repositories into a single stream.
Flowable<List<FieldViewModel>> viewModelsFlowable = Flowable.zip(fieldsFlowable, ruleEffectFlowable,
this::applyEffects);
compositeDisposable.add(viewModelsFlowable.subscribeOn(schedulerProvider.ui())
.observeOn(schedulerProvider.ui()).subscribe(view.showFields(sectionUid), throwable -> {
throw new OnErrorNotImplementedException(throwable);
}));
}
@Override
public void getProgramStages(String programId, @NonNull String uid) {
Flowable<List<ProgramStage>> stagesFlowable = programStageSelectionRepository.enrollmentProgramStages(programId, uid);
Flowable<Result<RuleEffect>> ruleEffectFlowable = programStageSelectionRepository.calculate();
// Combining results of two repositories into a single stream.
Flowable<List<ProgramStage>> stageModelsFlowable = Flowable.zip(
stagesFlowable.subscribeOn(schedulerProvider.io()),
ruleEffectFlowable.subscribeOn(schedulerProvider.io()),
this::applyEffects);
compositeDisposable.add(stageModelsFlowable
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(
view::setData,
Timber::e));
}
@Test
public void test() {
final Flowable<Integer> flo1 = Flowable.just(1).repeat();
final FlowableProcessor<Integer> flo2 = PublishProcessor.create();
final Flowable<Integer> result = Flowable.zip(flo2, flo1, (o1, o2) -> {
return 1;
}, true, 1);
result.subscribe(e -> System.out.println(e));
flo2.onNext(2);
flo2.onNext(3);
}
@Outgoing("X")
public Flowable<String> x() {
return Flowable.zip(Flowable.fromArray("a", "b", "c"),
Flowable.interval(10, TimeUnit.MILLISECONDS),
(a, b) -> a);
}
@Outgoing("X")
public Flowable<String> x() {
return Flowable.zip(Flowable.fromArray("a", "b", "c"),
Flowable.interval(10, TimeUnit.MILLISECONDS),
(a, b) -> a);
}
@Outgoing("X")
public Flowable<String> x() {
return Flowable.zip(Flowable.fromArray("a", "b", "c"),
Flowable.interval(10, TimeUnit.MILLISECONDS),
(a, b) -> a);
}
@Setup
public void setup() {
Integer[] array1 = new Integer[firstLen];
Arrays.fill(array1, 777);
Integer[] array2 = new Integer[secondLen];
Arrays.fill(array2, 777);
baseline = Flowable.fromArray(firstLen < secondLen ? array2 : array1);
Flowable<Integer> o1 = Flowable.fromArray(array1);
Flowable<Integer> o2 = Flowable.fromArray(array2);
BiFunction<Integer, Integer, Integer> plus = (a, b) -> a + b;
bothSync = Flowable.zip(o1, o2, plus);
firstSync = Flowable.zip(o1, o2.subscribeOn(Schedulers.computation()), plus);
secondSync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2, plus);
bothAsync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2.subscribeOn(Schedulers.computation()), plus);
small = Math.min(firstLen, secondLen) < 100;
}