下面列出了怎么用io.reactivex.rxjava3.core.ObservableSource的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(@NonNull F f) {
Observable<E> eventObservable =
Observable.fromSupplier(
new Supplier<E>() {
@Override
public E get() throws Throwable {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
/**
* Create an event source from the given RxJava streams.
*
* <p>All streams must be mapped to your event type.
*
* @param sources the observables you want to include in this event source
* @param <E> the event type
* @return an EventSource based on the provided observables
*/
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) throws Throwable {
eventConsumer.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
});
return new com.spotify.mobius.disposables.Disposable() {
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M value) {
emitter.onNext(value);
}
});
final Disposable eventsDisposable =
upstream.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Throwable {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
RequestContextSupplierObservable(ObservableSource<T> source, RequestContext assemblyContext) {
this.source = source;
this.assemblyContext = assemblyContext;
}
RequestContextObservable(ObservableSource<T> source, RequestContext assemblyContext) {
this.source = source;
this.assemblyContext = assemblyContext;
}
RequestContextScalarSupplierObservable(ObservableSource<T> source, RequestContext assemblyContext) {
this.source = source;
this.assemblyContext = assemblyContext;
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}