下面列出了怎么用io.reactivex.rxjava3.annotations.NonNull的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
final com.spotify.mobius.disposables.Disposable disposable =
eventSource.subscribe(
new com.spotify.mobius.functions.Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
}
});
}
});
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(@NonNull F f) {
Observable<E> eventObservable =
Observable.fromSupplier(
new Supplier<E>() {
@Override
public E get() throws Throwable {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Maybe.concatDelayError(Arrays.asList(cache,remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
})
.firstElement();
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Observable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
/**
* Create an event source from the given RxJava streams.
*
* <p>All streams must be mapped to your event type.
*
* @param sources the observables you want to include in this event source
* @param <E> the event type
* @return an EventSource based on the provided observables
*/
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) throws Throwable {
eventConsumer.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
});
return new com.spotify.mobius.disposables.Disposable() {
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
@Override
public Observable<R> apply(@NonNull Observable<T> input) {
return input.publish(
new Function<Observable<T>, Observable<R>>() {
@Override
public Observable<R> apply(Observable<T> innerInput) throws Throwable {
final List<Observable<R>> transformed = new ArrayList<>();
for (ObservableTransformer<T, R> transformer : transformers) {
transformed.add(innerInput.compose(transformer));
}
return Observable.merge(transformed);
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
/**
* Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe.
* <p>The method is thread-safe.
*/
@NonNull
@CheckReturnValue
public final Relay<T> toSerialized() {
if (this instanceof SerializedRelay) {
return this;
}
return new SerializedRelay<T>(this);
}
/**
* Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe.
* <p>The method is thread-safe.
*/
@NonNull
@CheckReturnValue
public final Relay<T> toSerialized() {
if (this instanceof SerializedRelay) {
return this;
}
return new SerializedRelay<T>(this);
}