下面列出了怎么用io.reactivex.rxjava3.core.BackpressureStrategy的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public Flowable<T> asFlowable(final BackpressureStrategy backpressureStrategy) {
return Flowable.create(emitter -> {
internalHandler = emitter::onNext;
emitter.setCancellable(this::close);
}, backpressureStrategy);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
static Flowable<String> flowable(int count) {
RequestContext.current();
return Flowable.create(emitter -> {
pool.submit(() -> {
for (int i = 0; i < count; i++) {
emitter.onNext(String.valueOf(count));
}
emitter.onComplete();
});
}, BackpressureStrategy.BUFFER);
}
@GET
@Produces("application/json")
@Path("textJsonImplicitList")
public Flowable<HelloWorldBean> getJsonImplicitList() {
return Flowable.create(subscriber -> {
Thread t = new Thread(() -> {
subscriber.onNext(new HelloWorldBean("Hello"));
sleep();
subscriber.onNext(new HelloWorldBean("Ciao"));
sleep();
subscriber.onComplete();
});
t.start();
}, BackpressureStrategy.MISSING);
}
private <T> Flowable<T> create(Supplier<T> supplier) {
Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
}
}, BackpressureStrategy.DROP);
if (sc == null) {
return flowable.subscribeOn(Schedulers.io());
}
return flowable.subscribeOn(sc).observeOn(sc);
}
default Flowable<T> asFlowable() {
return asFlowable(BackpressureStrategy.BUFFER);
}
@Override
public Flowable<DataItem> getDataFlowWithBackPressure(DataQuery dataQuery) {
return Flowable.create(new FlowableDataProducer(executor, dataQuery), BackpressureStrategy.BUFFER);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
return rxCache.<T>load2Flowable(key, type, backpressureStrategy);
}
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull ApolloSubscriptionCall<T> call) {
return from(call, BackpressureStrategy.LATEST);
}
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
@NotNull BackpressureStrategy backpressureStrategy) {
checkNotNull(call, "originalCall == null");
checkNotNull(backpressureStrategy, "backpressureStrategy == null");
return Flowable.create(new FlowableOnSubscribe<Response<T>>() {
@Override public void subscribe(final FlowableEmitter<Response<T>> emitter) throws Exception {
cancelOnFlowableDisposed(emitter, call);
call.execute(
new ApolloSubscriptionCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isCancelled()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isCancelled()) {
emitter.onError(e);
}
}
@Override public void onCompleted() {
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
@Override public void onTerminated() {
onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
+ "connection"));
}
@Override public void onConnected() {
}
}
);
}
}, backpressureStrategy);
}
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
}
/**
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
* Uses BackpressureStrategy.BUFFER.
*/
public static <T> Flowable<T> flowableOneByOne(final Query<T> query) {
return flowableOneByOne(query, BackpressureStrategy.BUFFER);
}
/**
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
* Uses given BackpressureStrategy.
*/
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
return Flowable.create(emitter -> createListItemEmitter(query, emitter), strategy);
}
Flowable<T> asFlowable(BackpressureStrategy backpressureStrategy);
<T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy);