下面列出了io.reactivex.Observable#merge ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
@Override
public void onModuleLoad() {
Elements.body().add(table()
.add(tr()
.add(td().add(addInputElement = input(number).apply(el -> el.valueAsNumber = 1).get()))
.add(td().add(addButtonElement = button("add").get())))
.add(tr()
.add(td().add(subInputElement = input(number).apply(el -> el.valueAsNumber = 1).get()))
.add(td().add(subButtonElement = button("sub").get())))
.add(tr()
.add(td().add(resultDivElement = div().get()))
.add(td().add(resetButtonElement = button("reset").get()))));
Observable<DoubleUnaryOperator> action$ = Observable.merge(
RxElemento.fromEvent(addButtonElement, click).map(ev -> addInputElement.valueAsNumber).map(val -> n1 -> n1 + val),
RxElemento.fromEvent(subButtonElement, click).map(ev -> subInputElement.valueAsNumber).map(val -> n1 -> n1 - val),
RxElemento.fromEvent(resetButtonElement, click).map(ev -> n1 -> 0));
action$.scan(0., (acc, n) -> n.applyAsDouble(acc))
.doOnNext(n -> logger.info("value change: " + n))
.subscribe(n -> resultDivElement.textContent = Double.toString(n));
}
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
@Override
public Observable<GameKey> startGameRequest(List<? extends GameKey> gameKeys) {
List<Observable<GameKey>> observables = new ArrayList<>();
for (final GameKey gameKey : gameKeys) {
Button button = (Button) LayoutInflater.from(getContext()).inflate(R.layout.game_button, this, false);
button.setText(gameKey.gameName());
Observable<GameKey> observable = RxView
.clicks(button)
.map(new Function<Object, GameKey>() {
@Override
public GameKey apply(Object o) throws Exception {
return gameKey;
}
});
observables.add(observable);
addView(button);
}
return Observable.merge(observables);
}
@Override
public Observable<BoardCoordinate> squareClicks() {
ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
final int finalI = i;
final int finalJ = j;
observables.add(
RxView.clicks(imageButtons[i][j])
.map(
new Function<Object, BoardCoordinate>() {
@Override
public BoardCoordinate apply(Object irrelevant) throws Exception {
return new BoardCoordinate(finalI, finalJ);
}
}));
}
}
return Observable.merge(observables);
}
@Override
protected void bindIntents() {
ArrayList<Observable<PartialStateChanges>> observables = new ArrayList<>();
observables.add(intent(view -> view.loadData())
.doOnNext(ignored -> Log.d(TAG, "Intent: Load data..."))
.flatMap(ignored -> Interactor.loadData()
.map(data -> (PartialStateChanges)new PartialStateChanges.DataLoaded(data))
.startWith(new PartialStateChanges.LoadingData())
.subscribeOn(Schedulers.io())
)
);
Observable<PartialStateChanges> allIntents = Observable.merge(observables);
ViewState initialState = ViewState.builder().build();
Observable<ViewState> stateObservable = allIntents.scan(initialState, this::viewStateReducer)
.observeOn(AndroidSchedulers.mainThread());
subscribeViewState(stateObservable, MainController::render);
}
@Test
public void test() throws Exception {
Observable<Integer> source = Observable.range(1, 5)
.delaySubscription(1, TimeUnit.SECONDS);
Function<Observable<Integer>, Observable<Integer>> func = o ->
Observable.merge(o.take(1), o.takeLast(1));
Observable<Integer> forkAndJoin = Observable.defer(() -> {
ConnectableObservable<Integer> conn = source
.doOnSubscribe(s -> System.out.println("Subscribed!"))
.publish();
Observable<Integer> result = func.apply(conn);
conn.connect();
return result;
});
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
Thread.sleep(10000);
}
@Override
public Observable<Integer> onAnswer() {
return Observable.merge(
onOneAnswer(cardAnswer1),
onOneAnswer(cardAnswer2),
onOneAnswer(cardAnswer3),
onOneAnswer(cardAnswer4));
}
private <T> Observable<T> withDisconnectionHandling(Output<T> output) {
//noinspection unchecked
return Observable.merge(
disconnectionRouter.<T>asErrorOnlyObservable(),
output.valueRelay,
(Observable<T>) output.errorRelay.flatMap(errorMapper)
);
}
/**
* Downloads reserved values for all the trackedEntityAttributeValues of type "generated", that is, it applies
* {@link #downloadReservedValues(String, Integer)} for every generated attribute.
*
* @param numberOfValuesToFillUp An optional maximum number of values to reserve
* @return An Observable that notifies about the progress.
*/
public Observable<D2Progress> downloadAllReservedValues(Integer numberOfValuesToFillUp) {
List<Observable<D2Progress>> observables = new ArrayList<>();
BooleanWrapper systemInfoDownloaded = new BooleanWrapper(false);
List<TrackedEntityAttribute> generatedAttributes = getGeneratedAttributes();
for (TrackedEntityAttribute attribute : generatedAttributes) {
observables.add(downloadValuesForOrgUnits(attribute.uid(), numberOfValuesToFillUp, systemInfoDownloaded));
}
return Observable.merge(observables);
}
private Observable<IndexResponse> multipleAsyncRequests(boolean post) {
String url = url(proxyPort(), "/foo/");
return Observable.merge(
Observable.fromIterable(() -> clients.iterator())
.map(
client -> requestAsync(client, url, post, clients.indexOf(client)).toObservable()));
}
@Override
public Observable<ScanData> scan() {
return Observable.merge(sharedScanData, getErrorSubject().toObservable());
}
private Observable<?> oneOf(Observable<?> trigger, Observable<?> pending) {
if (trigger == null) {
return Observable.just(TRIGGER);
}
return Observable.merge(trigger, pending);
}
private Observable<?> oneOf(Observable<?> trigger, Observable<?> pending) {
if (trigger == null) {
return Observable.just(TRIGGER);
}
return Observable.merge(trigger, pending);
}
public Observable<D2Progress> downloadSingleEvents(ProgramDataDownloadParams params) {
D2ProgressManager progressManager = new D2ProgressManager(2);
return Observable.merge(
downloadSystemInfo(progressManager),
downloadEventsInternal(params, progressManager));
}