io.reactivex.rxjava3.annotations.NonNull源码实例Demo

类io.reactivex.rxjava3.annotations.NonNull源码实例Demo

下面列出了io.reactivex.rxjava3.annotations.NonNull 类实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: mobius   文件: RxEventSources.java
/**
 * Create an observable from the given event source.
 *
 * @param eventSource the eventSource you want to convert to an observable
 * @param <E> the event type
 * @return an Observable based on the provided event source
 */
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
  return Observable.create(
      new ObservableOnSubscribe<E>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
          final com.spotify.mobius.disposables.Disposable disposable =
              eventSource.subscribe(
                  new com.spotify.mobius.functions.Consumer<E>() {
                    @Override
                    public void accept(E value) {
                      emitter.onNext(value);
                    }
                  });
          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Throwable {
                  disposable.dispose();
                }
              });
        }
      });
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
 * the stream as an {@link Observable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the function on the specified scheduler, and passing it
 * the requested effect object then emitting its returned value.
 *
 * @param function the {@link Function} to be invoked every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the function
 * @param <F> the type of Effect this transformer handles
 * @param <E> the type of Event this transformer emits
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromFunction(
    final Function<F, E> function, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream.flatMap(
          new Function<F, ObservableSource<E>>() {
            @Override
            public ObservableSource<E> apply(@NonNull F f) {
              Observable<E> eventObservable =
                  Observable.fromSupplier(
                      new Supplier<E>() {
                        @Override
                        public E get() throws Throwable {
                          return function.apply(f);
                        }
                      });
              return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
            }
          });
    }
  };
}
 
源代码3 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Flowable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码4 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Flowable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码5 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Maybe.concatDelayError(Arrays.asList(cache,remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            })
            .firstElement();
}
 
源代码6 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Observable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码7 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码8 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码9 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码10 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码11 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码12 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码13 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码14 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码15 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码16 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码17 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码18 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码19 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码20 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码21 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码22 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Observable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码23 项目: mobius   文件: RxEventSources.java
/**
 * Create an event source from the given RxJava streams.
 *
 * <p>All streams must be mapped to your event type.
 *
 * @param sources the observables you want to include in this event source
 * @param <E> the event type
 * @return an EventSource based on the provided observables
 */
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
  final Observable<E> eventSource = Observable.mergeArray(sources);
  return new EventSource<E>() {

    @Nonnull
    @Override
    public com.spotify.mobius.disposables.Disposable subscribe(
        com.spotify.mobius.functions.Consumer<E> eventConsumer) {
      final Disposable disposable =
          eventSource.subscribe(
              new Consumer<E>() {
                @Override
                public void accept(E value) throws Throwable {
                  eventConsumer.accept(value);
                }
              },
              new Consumer<Throwable>() {
                @Override
                public void accept(Throwable error) throws Throwable {
                  RxJavaPlugins.onError(error);
                }
              });
      return new com.spotify.mobius.disposables.Disposable() {
        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码24 项目: mobius   文件: MergedTransformer.java
@Override
public Observable<R> apply(@NonNull Observable<T> input) {
  return input.publish(
      new Function<Observable<T>, Observable<R>>() {
        @Override
        public Observable<R> apply(Observable<T> innerInput) throws Throwable {
          final List<Observable<R>> transformed = new ArrayList<>();
          for (ObservableTransformer<T, R> transformer : transformers) {
            transformed.add(innerInput.compose(transformer));
          }
          return Observable.merge(transformed);
        }
      });
}
 
源代码25 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码26 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码27 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码28 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码29 项目: RxRelay   文件: Relay.java
/**
 * Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe.
 * <p>The method is thread-safe.
 */
@NonNull
@CheckReturnValue
public final Relay<T> toSerialized() {
    if (this instanceof SerializedRelay) {
        return this;
    }
    return new SerializedRelay<T>(this);
}
 
源代码30 项目: RxRelay   文件: Relay.java
/**
 * Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe.
 * <p>The method is thread-safe.
 */
@NonNull
@CheckReturnValue
public final Relay<T> toSerialized() {
    if (this instanceof SerializedRelay) {
        return this;
    }
    return new SerializedRelay<T>(this);
}
 
源代码评论
动弹
沙发等你来抢
 类所在包
 类方法
 同包方法