下面列出了io.reactivex.rxjava3.core.BackpressureStrategy#io.reactivex.rxjava3.core.FlowableEmitter 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
final DataSubscription dataSubscription = query.subscribe().observer(data -> {
for (T datum : data) {
if (emitter.isCancelled()) {
return;
} else {
emitter.onNext(datum);
}
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
});
emitter.setCancellable(dataSubscription::cancel);
}
private <T> Flowable<T> create(Supplier<T> supplier) {
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
}
}, BackpressureStrategy.DROP);
if (sc == null) {
return flowable.subscribeOn(Schedulers.io());
}
return flowable.subscribeOn(sc).observeOn(sc);
}
@Override
public void subscribe(FlowableEmitter<DataItem> emitter) throws Throwable {
DataProducerTask dirScannerTask = new DataProducerTask(emitter, query);
executor.execute(dirScannerTask);
}
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
@NotNull BackpressureStrategy backpressureStrategy) {
checkNotNull(call, "originalCall == null");
checkNotNull(backpressureStrategy, "backpressureStrategy == null");
return Flowable.create(new FlowableOnSubscribe<Response<T>>() {
@Override public void subscribe(final FlowableEmitter<Response<T>> emitter) throws Exception {
cancelOnFlowableDisposed(emitter, call);
call.execute(
new ApolloSubscriptionCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
@Override public void onCompleted() {
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
@Override public void onTerminated() {
onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
+ "connection"));
}
@Override public void onConnected() {
}
}
);
}
}, backpressureStrategy);
}
private static <T> void cancelOnFlowableDisposed(FlowableEmitter<T> emitter, final Cancelable cancelable) {
emitter.setDisposable(getRx3Disposable(cancelable));
}