类io.reactivex.rxjava3.core.BackpressureStrategy源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.BackpressureStrategy的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Flowable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码2 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码3 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码4 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码5 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码6 项目: catnip   文件: DefaultDispatchManager.java
@Override
public Flowable<T> asFlowable(final BackpressureStrategy backpressureStrategy) {
    return Flowable.create(emitter -> {
        internalHandler = emitter::onNext;
        emitter.setCancellable(this::close);
    }, backpressureStrategy);
}
 
源代码7 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码8 项目: armeria   文件: RequestContextAssemblyTest.java
static Flowable<String> flowable(int count) {
    RequestContext.current();
    return Flowable.create(emitter -> {
        pool.submit(() -> {
            for (int i = 0; i < count; i++) {
                emitter.onNext(String.valueOf(count));
            }
            emitter.onComplete();
        });
    }, BackpressureStrategy.BUFFER);
}
 
源代码9 项目: cxf   文件: RxJava3FlowableService.java
@GET
@Produces("application/json")
@Path("textJsonImplicitList")
public Flowable<HelloWorldBean> getJsonImplicitList() {
    return Flowable.create(subscriber -> {
        Thread t = new Thread(() -> {
            subscriber.onNext(new HelloWorldBean("Hello"));
            sleep();
            subscriber.onNext(new HelloWorldBean("Ciao"));
            sleep();
            subscriber.onComplete();
        });
        t.start();
    }, BackpressureStrategy.MISSING);
}
 
源代码10 项目: cxf   文件: FlowableRxInvokerImpl.java
private <T> Flowable<T> create(Supplier<T> supplier) {
    Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                T response = supplier.get();
                if (!emitter.isCancelled()) {
                    emitter.onNext(response);
                }
                
                if (!emitter.isCancelled()) {
                    emitter.onComplete();
                }
            } catch (Throwable e) {
                if (!emitter.isCancelled()) {
                    emitter.onError(e);
                }
            }
        }
    }, BackpressureStrategy.DROP);
    
    if (sc == null) {
        return flowable.subscribeOn(Schedulers.io());
    }
    
    return flowable.subscribeOn(sc).observeOn(sc);
}
 
源代码11 项目: catnip   文件: MessageConsumer.java
default Flowable<T> asFlowable() {
    return asFlowable(BackpressureStrategy.BUFFER);
}
 
源代码12 项目: java-11-examples   文件: DataServiceImpl.java
@Override
public Flowable<DataItem> getDataFlowWithBackPressure(DataQuery dataQuery) {
    return Flowable.create(new FlowableDataProducer(executor, dataQuery), BackpressureStrategy.BUFFER);
}
 
源代码13 项目: RxCache   文件: CacheOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    return rxCache.<T>load2Flowable(key, type, backpressureStrategy);
}
 
源代码14 项目: apollo-android   文件: Rx3Apollo.java
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull ApolloSubscriptionCall<T> call) {
  return from(call, BackpressureStrategy.LATEST);
}
 
源代码15 项目: apollo-android   文件: Rx3Apollo.java
@NotNull
@CheckReturnValue
public static <T> Flowable<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
    @NotNull BackpressureStrategy backpressureStrategy) {
  checkNotNull(call, "originalCall == null");
  checkNotNull(backpressureStrategy, "backpressureStrategy == null");
  return Flowable.create(new FlowableOnSubscribe<Response<T>>() {
    @Override public void subscribe(final FlowableEmitter<Response<T>> emitter) throws Exception {
      cancelOnFlowableDisposed(emitter, call);
      call.execute(
          new ApolloSubscriptionCall.Callback<T>() {
            @Override public void onResponse(@NotNull Response<T> response) {
              if (!emitter.isCancelled()) {
                emitter.onNext(response);
              }
            }

            @Override public void onFailure(@NotNull ApolloException e) {
              Exceptions.throwIfFatal(e);
              if (!emitter.isCancelled()) {
                emitter.onError(e);
              }
            }

            @Override public void onCompleted() {
              if (!emitter.isCancelled()) {
                emitter.onComplete();
              }
            }

            @Override public void onTerminated() {
              onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
                  + "connection"));
            }

            @Override public void onConnected() {
            }
          }
      );
    }
  }, backpressureStrategy);
}
 
源代码16 项目: RxLifecycle   文件: LifecycleTransformer.java
@Override
public Publisher<T> apply(Flowable<T> upstream) {
    return upstream.takeUntil(observable.toFlowable(BackpressureStrategy.LATEST));
}
 
源代码17 项目: objectbox-java   文件: RxQuery.java
/**
 * The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
 * Uses BackpressureStrategy.BUFFER.
 */
public static <T> Flowable<T> flowableOneByOne(final Query<T> query) {
    return flowableOneByOne(query, BackpressureStrategy.BUFFER);
}
 
源代码18 项目: objectbox-java   文件: RxQuery.java
/**
 * The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
 * Uses given BackpressureStrategy.
 */
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
    return Flowable.create(emitter -> createListItemEmitter(query, emitter), strategy);
}
 
源代码19 项目: catnip   文件: MessageConsumer.java
Flowable<T> asFlowable(BackpressureStrategy backpressureStrategy); 
源代码20 项目: RxCache   文件: FlowableStrategy.java
<T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy); 
 类所在包
 同包方法