类io.reactivex.rxjava3.core.ObservableSource源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.ObservableSource的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Throwable {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码3 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
 * the stream as an {@link Observable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the function on the specified scheduler, and passing it
 * the requested effect object then emitting its returned value.
 *
 * @param function the {@link Function} to be invoked every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the function
 * @param <F> the type of Effect this transformer handles
 * @param <E> the type of Event this transformer emits
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromFunction(
    final Function<F, E> function, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream.flatMap(
          new Function<F, ObservableSource<E>>() {
            @Override
            public ObservableSource<E> apply(@NonNull F f) {
              Observable<E> eventObservable =
                  Observable.fromSupplier(
                      new Supplier<E>() {
                        @Override
                        public E get() throws Throwable {
                          return function.apply(f);
                        }
                      });
              return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
            }
          });
    }
  };
}
 
源代码4 项目: mobius   文件: RxEventSources.java
/**
 * Create an event source from the given RxJava streams.
 *
 * <p>All streams must be mapped to your event type.
 *
 * @param sources the observables you want to include in this event source
 * @param <E> the event type
 * @return an EventSource based on the provided observables
 */
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
  final Observable<E> eventSource = Observable.mergeArray(sources);
  return new EventSource<E>() {

    @Nonnull
    @Override
    public com.spotify.mobius.disposables.Disposable subscribe(
        com.spotify.mobius.functions.Consumer<E> eventConsumer) {
      final Disposable disposable =
          eventSource.subscribe(
              new Consumer<E>() {
                @Override
                public void accept(E value) throws Throwable {
                  eventConsumer.accept(value);
                }
              },
              new Consumer<Throwable>() {
                @Override
                public void accept(Throwable error) throws Throwable {
                  RxJavaPlugins.onError(error);
                }
              });
      return new com.spotify.mobius.disposables.Disposable() {
        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码5 项目: mobius   文件: RxMobiusLoop.java
@Override
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
  return Observable.create(
      new ObservableOnSubscribe<M>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
          final MobiusLoop<M, E, ?> loop;

          if (startEffects == null) {
            loop = loopFactory.startFrom(startModel);
          } else {
            loop = loopFactory.startFrom(startModel, startEffects);
          }

          loop.observe(
              new com.spotify.mobius.functions.Consumer<M>() {
                @Override
                public void accept(M value) {
                  emitter.onNext(value);
                }
              });
          final Disposable eventsDisposable =
              upstream.subscribe(
                  new Consumer<E>() {
                    @Override
                    public void accept(E event) throws Throwable {
                      loop.dispatchEvent(event);
                    }
                  },
                  new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Throwable {
                      emitter.onError(new UnrecoverableIncomingException(throwable));
                    }
                  });
          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Throwable {
                  loop.dispose();
                  eventsDisposable.dispose();
                }
              });
        }
      });
}
 
源代码6 项目: mobius   文件: RxConnectables.java
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
    final Connectable<I, O> connectable) {
  return new ObservableTransformer<I, O>() {
    @Override
    public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
      return Observable.create(
          new ObservableOnSubscribe<O>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
              com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
              final Connection<I> input = connectable.connect(output);
              final Disposable disposable =
                  upstream.subscribe(
                      new Consumer<I>() {
                        @Override
                        public void accept(I value) throws Throwable {
                          input.accept(value);
                        }
                      },
                      new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable error) throws Throwable {
                          emitter.onError(error);
                        }
                      },
                      new Action() {
                        @Override
                        public void run() throws Throwable {
                          emitter.onComplete();
                        }
                      });

              emitter.setCancellable(
                  new Cancellable() {
                    @Override
                    public void cancel() throws Throwable {
                      disposable.dispose();
                      input.dispose();
                    }
                  });
            }
          });
    }
  };
}
 
RequestContextSupplierObservable(ObservableSource<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
源代码8 项目: armeria   文件: RequestContextObservable.java
RequestContextObservable(ObservableSource<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
RequestContextScalarSupplierObservable(ObservableSource<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
源代码10 项目: RxLifecycle   文件: LifecycleTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.takeUntil(observable);
}
 
 类所在包
 同包方法