io.reactivex.Observable#merge ( )源码实例Demo

下面列出了io.reactivex.Observable#merge ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: AndroidWallet   文件: RxBus.java
/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (mStickyEventMap) {
        Observable<T> observable = mBus.ofType(eventType);
        final Object event = mStickyEventMap.get(eventType);

        if (event != null) {
            return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                    emitter.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码2 项目: AndroidWallet   文件: RxBus.java
/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (mStickyEventMap) {
        Observable<T> observable = mBus.ofType(eventType);
        final Object event = mStickyEventMap.get(eventType);

        if (event != null) {
            return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                    emitter.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码3 项目: gwt-boot-samples   文件: RxGwtEntryPoint.java
@Override
public void onModuleLoad() {
    Elements.body().add(table()
            .add(tr()
                    .add(td().add(addInputElement = input(number).apply(el -> el.valueAsNumber = 1).get()))
                    .add(td().add(addButtonElement = button("add").get())))
            .add(tr()
                    .add(td().add(subInputElement = input(number).apply(el -> el.valueAsNumber = 1).get()))
                    .add(td().add(subButtonElement = button("sub").get())))
            .add(tr()
                    .add(td().add(resultDivElement = div().get()))
                    .add(td().add(resetButtonElement = button("reset").get()))));

    Observable<DoubleUnaryOperator> action$ = Observable.merge(
            RxElemento.fromEvent(addButtonElement, click).map(ev -> addInputElement.valueAsNumber).map(val -> n1 -> n1 + val),
            RxElemento.fromEvent(subButtonElement, click).map(ev -> subInputElement.valueAsNumber).map(val -> n1 -> n1 - val),
            RxElemento.fromEvent(resetButtonElement, click).map(ev -> n1 -> 0));

    action$.scan(0., (acc, n) -> n.applyAsDouble(acc))
            .doOnNext(n -> logger.info("value change: " + n))
            .subscribe(n -> resultDivElement.textContent = Double.toString(n));
}
 
源代码4 项目: RIBs   文件: TicTacToeView.java
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
 
源代码5 项目: RIBs   文件: TicTacToeView.java
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
 
源代码6 项目: RIBs   文件: TicTacToeView.java
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
 
源代码7 项目: RIBs   文件: OffGameView.java
@Override
public Observable<GameKey> startGameRequest(List<? extends GameKey> gameKeys) {
  List<Observable<GameKey>> observables = new ArrayList<>();
  for (final GameKey gameKey : gameKeys) {
    Button button = (Button) LayoutInflater.from(getContext()).inflate(R.layout.game_button, this, false);
    button.setText(gameKey.gameName());
    Observable<GameKey> observable = RxView
        .clicks(button)
        .map(new Function<Object, GameKey>() {
          @Override
          public GameKey apply(Object o) throws Exception {
            return gameKey;
          }
        });
    observables.add(observable);
    addView(button);
  }
  return Observable.merge(observables);
}
 
源代码8 项目: RIBs   文件: TicTacToeView.java
@Override
public Observable<BoardCoordinate> squareClicks() {
  ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
  for (int i = 0; i < 3; i++) {
    for (int j = 0; j < 3; j++) {
      final int finalI = i;
      final int finalJ = j;
      observables.add(
          RxView.clicks(imageButtons[i][j])
              .map(
                  new Function<Object, BoardCoordinate>() {
                    @Override
                    public BoardCoordinate apply(Object irrelevant) throws Exception {
                      return new BoardCoordinate(finalI, finalJ);
                    }
                  }));
    }
  }
  return Observable.merge(observables);
}
 
源代码9 项目: mosby-conductor   文件: MainPresenter.java
@Override
protected void bindIntents() {

    ArrayList<Observable<PartialStateChanges>> observables = new ArrayList<>();

    observables.add(intent(view -> view.loadData())
            .doOnNext(ignored -> Log.d(TAG, "Intent: Load data..."))
            .flatMap(ignored -> Interactor.loadData()
                    .map(data -> (PartialStateChanges)new PartialStateChanges.DataLoaded(data))
                    .startWith(new PartialStateChanges.LoadingData())
                    .subscribeOn(Schedulers.io())
            )
    );

    Observable<PartialStateChanges> allIntents = Observable.merge(observables);
    ViewState initialState = ViewState.builder().build();
    Observable<ViewState> stateObservable = allIntents.scan(initialState, this::viewStateReducer)
            .observeOn(AndroidSchedulers.mainThread());
    subscribeViewState(stateObservable, MainController::render);
}
 
源代码10 项目: akarnokd-misc   文件: PublishFuncExample.java
@Test
public void test() throws Exception {
    Observable<Integer> source = Observable.range(1, 5)
            .delaySubscription(1, TimeUnit.SECONDS);

        Function<Observable<Integer>, Observable<Integer>> func = o ->
            Observable.merge(o.take(1), o.takeLast(1));

        Observable<Integer> forkAndJoin = Observable.defer(() -> {
            ConnectableObservable<Integer> conn = source
                .doOnSubscribe(s -> System.out.println("Subscribed!"))
                .publish();
            Observable<Integer> result = func.apply(conn);
            conn.connect();
            return result;
        });

        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
    
        Thread.sleep(10000);
}
 
源代码11 项目: memorize   文件: QuizActivity.java
@Override
public Observable<Integer> onAnswer() {
    return Observable.merge(
            onOneAnswer(cardAnswer1),
            onOneAnswer(cardAnswer2),
            onOneAnswer(cardAnswer3),
            onOneAnswer(cardAnswer4));
}
 
源代码12 项目: RxAndroidBle   文件: RxBleGattCallback.java
private <T> Observable<T> withDisconnectionHandling(Output<T> output) {
    //noinspection unchecked
    return Observable.merge(
            disconnectionRouter.<T>asErrorOnlyObservable(),
            output.valueRelay,
            (Observable<T>) output.errorRelay.flatMap(errorMapper)
    );
}
 
/**
 * Downloads reserved values for all the trackedEntityAttributeValues of type "generated", that is, it applies
 * {@link #downloadReservedValues(String, Integer)} for every generated attribute.
 *
 * @param numberOfValuesToFillUp An optional maximum number of values to reserve
 * @return An Observable that notifies about the progress.
 */
public Observable<D2Progress> downloadAllReservedValues(Integer numberOfValuesToFillUp) {
    List<Observable<D2Progress>> observables = new ArrayList<>();
    BooleanWrapper systemInfoDownloaded = new BooleanWrapper(false);

    List<TrackedEntityAttribute> generatedAttributes = getGeneratedAttributes();

    for (TrackedEntityAttribute attribute : generatedAttributes) {
        observables.add(downloadValuesForOrgUnits(attribute.uid(), numberOfValuesToFillUp, systemInfoDownloaded));
    }

    return Observable.merge(observables);
}
 
源代码14 项目: xio   文件: ReverseProxyFunctionalTest.java
private Observable<IndexResponse> multipleAsyncRequests(boolean post) {
  String url = url(proxyPort(), "/foo/");
  return Observable.merge(
      Observable.fromIterable(() -> clients.iterator())
          .map(
              client -> requestAsync(client, url, post, clients.indexOf(client)).toObservable()));
}
 
源代码15 项目: RxCentralBle   文件: JellyBeanScanner.java
@Override
public Observable<ScanData> scan() {
  return Observable.merge(sharedScanData, getErrorSubject().toObservable());
}
 
源代码16 项目: GankGirl   文件: RxPermissions.java
private Observable<?> oneOf(Observable<?> trigger, Observable<?> pending) {
    if (trigger == null) {
        return Observable.just(TRIGGER);
    }
    return Observable.merge(trigger, pending);
}
 
源代码17 项目: RxPermissions   文件: RxPermissions.java
private Observable<?> oneOf(Observable<?> trigger, Observable<?> pending) {
    if (trigger == null) {
        return Observable.just(TRIGGER);
    }
    return Observable.merge(trigger, pending);
}
 
public Observable<D2Progress> downloadSingleEvents(ProgramDataDownloadParams params) {
    D2ProgressManager progressManager = new D2ProgressManager(2);
    return Observable.merge(
            downloadSystemInfo(progressManager),
            downloadEventsInternal(params, progressManager));
}