下面列出了io.reactivex.rxjava3.core.Observable#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
final com.spotify.mobius.disposables.Disposable disposable =
eventSource.subscribe(
new com.spotify.mobius.functions.Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
}
});
}
});
}
/**
* Converts an {@link ApolloQueryWatcher} to an asynchronous Observable.
*
* @param watcher the ApolloQueryWatcher to convert.
* @param <T> the value type
* @return the converted Observable
* @throws NullPointerException if watcher == null
*/
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) {
checkNotNull(watcher, "watcher == null");
return Observable.create(new ObservableOnSubscribe<Response<T>>() {
@Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
cancelOnObservableDisposed(emitter, watcher);
watcher.enqueueAndWatch(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
});
}
});
}
@Override
public Observable<T> asObservable() {
return Observable.create(emitter -> {
internalHandler = emitter::onNext;
emitter.setCancellable(this::close);
});
}
public static <T> Observable<T> futureToObservable(final CompletableFuture<T> future) {
return Observable.create(subscriber ->
future.whenComplete((result, error) -> {
if(error != null) {
subscriber.onError(error);
} else {
subscriber.onNext(result);
subscriber.onComplete();
}
}));
}
@Override
public void initialize(URL location, ResourceBundle resources) {
File file = new File(WindowController.fileName);
Image image = new Image(file.toURI().toString());
imageBeforePreview.setImage(image);
clickObservable = Observable
.create(s -> imageAfterPreview.setOnMouseClicked((event) -> {
if (pixelTraversalController == null) {
return;
}
s.onNext(new Point((int) event.getX(), (int) event.getY()));
}));
}
/**
* Using the returned Observable, you can be notified about data changes.
* Once a transaction is committed, you will get info on classes with changed Objects.
*/
@SuppressWarnings("rawtypes") // BoxStore observer may return any (entity) type.
public static Observable<Class> observable(BoxStore boxStore) {
return Observable.create(emitter -> {
final DataSubscription dataSubscription = boxStore.subscribe().observer(data -> {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
});
emitter.setCancellable(dataSubscription::cancel);
});
}
/**
* The returned Observable emits Query results as Lists.
* Never completes, so you will get updates when underlying data changes
* (see {@link Query#subscribe()} for details).
*/
public static <T> Observable<List<T>> observable(final Query<T> query) {
return Observable.create(emitter -> {
final DataSubscription dataSubscription = query.subscribe().observer(data -> {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
});
emitter.setCancellable(dataSubscription::cancel);
});
}
/**
* Converts an {@link ApolloCall} to an {@link Observable}. The number of emissions this Observable will have is based
* on the {@link com.apollographql.apollo.fetcher.ResponseFetcher} used with the call.
*
* @param call the ApolloCall to convert
* @param <T> the value type.
* @return the converted Observable
* @throws NullPointerException if originalCall == null
*/
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloCall<T> call) {
checkNotNull(call, "call == null");
return Observable.create(new ObservableOnSubscribe<Response<T>>() {
@Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
cancelOnObservableDisposed(emitter, call);
call.enqueue(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
@Override public void onStatusEvent(@NotNull ApolloCall.StatusEvent event) {
if (event == ApolloCall.StatusEvent.COMPLETED && !emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
});
}
private <T> Observable<T> create(Supplier<T> supplier) {
Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
}
});
if (sc == null) {
return observable.subscribeOn(Schedulers.io());
}
return observable.subscribeOn(sc).observeOn(sc);
}
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M value) {
emitter.onNext(value);
}
});
final Disposable eventsDisposable =
upstream.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Throwable {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
@Override
public Observable<DataItem> getDataFlow(DataQuery dataQuery) {
return Observable.create(new ObservableDataProducer(executor, dataQuery));
}
private Observable<RedisToken> execute(RespCommand command, Request request) {
return Observable.create(observer -> {
observer.onNext(executeCommand(command, request));
observer.onComplete();
});
}