类io.reactivex.rxjava3.core.FlowableOnSubscribe源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.FlowableOnSubscribe的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: cxf   文件: FlowableRxInvokerImpl.java
private <T> Flowable<T> create(Supplier<T> supplier) {
    Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                T response = supplier.get();
                if (!emitter.isCancelled()) {
                    emitter.onNext(response);
                }
                
                if (!emitter.isCancelled()) {
                    emitter.onComplete();
                }
            } catch (Throwable e) {
                if (!emitter.isCancelled()) {
                    emitter.onError(e);
                }
            }
        }
    }, BackpressureStrategy.DROP);
    
    if (sc == null) {
        return flowable.subscribeOn(Schedulers.io());
    }
    
    return flowable.subscribeOn(sc).observeOn(sc);
}
 
源代码2 项目: apollo-android   文件: Rx3Apollo.java
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
    @NotNull BackpressureStrategy backpressureStrategy) {
  checkNotNull(call, "originalCall == null");
  checkNotNull(backpressureStrategy, "backpressureStrategy == null");
  return Flowable.create(new FlowableOnSubscribe<Response<T>>() {
    @Override public void subscribe(final FlowableEmitter<Response<T>> emitter) throws Exception {
      cancelOnFlowableDisposed(emitter, call);
      call.execute(
          new ApolloSubscriptionCall.Callback<T>() {
            @Override public void onResponse(@NotNull Response<T> response) {
              if (!emitter.isCancelled()) {
                emitter.onNext(response);
              }
            }

            @Override public void onFailure(@NotNull ApolloException e) {
              Exceptions.throwIfFatal(e);
              if (!emitter.isCancelled()) {
                emitter.onError(e);
              }
            }

            @Override public void onCompleted() {
              if (!emitter.isCancelled()) {
                emitter.onComplete();
              }
            }

            @Override public void onTerminated() {
              onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
                  + "connection"));
            }

            @Override public void onConnected() {
            }
          }
      );
    }
  }, backpressureStrategy);
}
 
 类所在包
 类方法
 同包方法