下面列出了怎么用io.reactivex.rxjava3.core.ObservableEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
final com.spotify.mobius.disposables.Disposable disposable =
eventSource.subscribe(
new com.spotify.mobius.functions.Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
}
});
}
});
}
/**
* Converts an {@link ApolloQueryWatcher} to an asynchronous Observable.
*
* @param watcher the ApolloQueryWatcher to convert.
* @param <T> the value type
* @return the converted Observable
* @throws NullPointerException if watcher == null
*/
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) {
checkNotNull(watcher, "watcher == null");
return Observable.create(new ObservableOnSubscribe<Response<T>>() {
@Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
cancelOnObservableDisposed(emitter, watcher);
watcher.enqueueAndWatch(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
});
}
});
}
/**
* Converts an {@link ApolloCall} to an {@link Observable}. The number of emissions this Observable will have is based
* on the {@link com.apollographql.apollo.fetcher.ResponseFetcher} used with the call.
*
* @param call the ApolloCall to convert
* @param <T> the value type.
* @return the converted Observable
* @throws NullPointerException if originalCall == null
*/
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloCall<T> call) {
checkNotNull(call, "call == null");
return Observable.create(new ObservableOnSubscribe<Response<T>>() {
@Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
cancelOnObservableDisposed(emitter, call);
call.enqueue(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
@Override public void onStatusEvent(@NotNull ApolloCall.StatusEvent event) {
if (event == ApolloCall.StatusEvent.COMPLETED && !emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
});
}
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
verifyMainThread();
EmitterListener listener = new EmitterListener(emitter);
emitter.setDisposable(listener);
view.addOnAttachStateChangeListener(listener);
}
private <T> Observable<T> create(Supplier<T> supplier) {
Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
}
});
if (sc == null) {
return observable.subscribeOn(Schedulers.io());
}
return observable.subscribeOn(sc).observeOn(sc);
}
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M value) {
emitter.onNext(value);
}
});
final Disposable eventsDisposable =
upstream.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Throwable {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
RemoteLndStreamObserver(ObservableEmitter<V> emitter) {
mEmitter = emitter;
}
@Override
public void subscribe(ObservableEmitter<DataItem> emitter) throws Throwable {
DataProducerTask dirScannerTask = new DataProducerTask(emitter, query);
executor.execute(dirScannerTask);
}
private static <T> void cancelOnObservableDisposed(ObservableEmitter<T> emitter, final Cancelable cancelable) {
emitter.setDisposable(getRx3Disposable(cancelable));
}
public EmitterListener(ObservableEmitter<Object> emitter) {
this.emitter = emitter;
}