下面列出了怎么用io.reactivex.rxjava3.core.FlowableOnSubscribe的API类实例代码及写法,或者点击链接到github查看源代码。
private <T> Flowable<T> create(Supplier<T> supplier) {
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
}
}, BackpressureStrategy.DROP);
if (sc == null) {
return flowable.subscribeOn(Schedulers.io());
}
return flowable.subscribeOn(sc).observeOn(sc);
}
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
@NotNull BackpressureStrategy backpressureStrategy) {
checkNotNull(call, "originalCall == null");
checkNotNull(backpressureStrategy, "backpressureStrategy == null");
return Flowable.create(new FlowableOnSubscribe<Response<T>>() {
@Override public void subscribe(final FlowableEmitter<Response<T>> emitter) throws Exception {
cancelOnFlowableDisposed(emitter, call);
call.execute(
new ApolloSubscriptionCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
@Override public void onCompleted() {
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
@Override public void onTerminated() {
onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
+ "connection"));
}
@Override public void onConnected() {
}
}
);
}
}, backpressureStrategy);
}