下面列出了怎么用io.reactivex.rxjava3.functions.Function的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Optionally set a shared error handler in case a handler throws an uncaught exception.
*
* <p>The default is to use {@link RxJavaPlugins#onError(Throwable)}. Note that any exception
* thrown by a handler is a fatal error and this method doesn't enable safe error handling, only
* configurable crash reporting.
*
* @param function a function that gets told which sub-transformer failed and should return an
* appropriate handler for exceptions thrown.
*/
public RxMobius.SubtypeEffectHandlerBuilder<F, E> withFatalErrorHandler(
final Function<ObservableTransformer<? extends F, E>, Consumer<Throwable>> function) {
checkNotNull(function);
this.onErrorFunction =
new OnErrorFunction<ObservableTransformer<? extends F, E>, Consumer<Throwable>>() {
@Override
public Consumer<Throwable> apply(ObservableTransformer<? extends F, E> effectHandler) {
try {
return function.apply(effectHandler);
} catch (Throwable e) {
throw new RuntimeException(
"FATAL: fatal error handler threw exception for effect handler: "
+ effectHandler,
e);
}
}
};
return this;
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(@NonNull F f) {
Observable<E> eventObservable =
Observable.fromSupplier(
new Supplier<E>() {
@Override
public E get() throws Throwable {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
final List<String> effects = Arrays.asList("Hello", "Rx");
PublishSubject<String> upstream = PublishSubject.create();
Function<String, Integer> sleepyFunction =
s -> {
try {
Thread.sleep(duration(s));
} catch (InterruptedException ie) {
}
return s.length();
};
final List<Integer> results = new ArrayList<>();
upstream
.compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
.subscribe(results::add);
Observable.fromIterable(effects).subscribe(upstream);
await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
private Publisher<List<App>> getApps(Context context, final DateFormat dateFormat) {
return Flowable.just(new WeakReference<>(context))
.map(new Function<WeakReference<Context>, List<App>>() {
@Override
public List<App> apply(WeakReference<Context> weakContext) throws Exception {
final PackageManager pm = weakContext.get().getPackageManager();
List<PackageInfo> infos = pm.getInstalledPackages(0);
List<App> apps = new ArrayList<>();
for (PackageInfo info : infos) {
App app = new App(info, pm);
app.isFormFile = false;
Date date = new Date(info.lastUpdateTime);
app.time = dateFormat.format(date);
apps.add(app);
}
setApps(apps);
return apps;
}
});
}
public Disposable getAndSort(Context context, final boolean sortByTime, final DateFormat dateFormat, KWSubscriber<ItemArray> subscriber) {
return Flowable.just(new WeakReference<>(context))
.flatMap(new Function<WeakReference<Context>, Publisher<List<App>>>() {
@Override
public Publisher<List<App>> apply(WeakReference<Context> weakContext) throws Exception {
List<App> apps = getApps();
if (apps == null) {
return getApps(weakContext.get(), dateFormat);
} else {
return Flowable.just(apps);
}
}
})
.map(new SortFunction(dateFormat, sortByTime))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Deprecated
public Disposable saveApk(App app, final String dest, final KWSubscriber<String> subscriber) {
return Flowable.just(app)
.map(new Function<App, String>() {
@Override
public String apply(App source) throws Exception {
String fileName = source.name + "_" + source.versionName + ".apk";
return FileUtil.copy(source.apkPath, dest, fileName, new OnCopyListener() {
@Override
public void inProgress(final float progress) {
mHandler.post(new Runnable() {
@Override
public void run() {
subscriber.inProgress(progress);
}
});
}
}).getAbsolutePath();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
public Disposable saveApk(ContentResolver resolver, App app, final Uri dest, final KWSubscriber<Uri> subscriber) {
return Flowable.just(app)
.map(new Function<App, Uri>() {
@Override
public Uri apply(App source) throws Exception {
FileUtil.copy(resolver, Uri.fromFile(new File(source.apkPath)), dest, new OnCopyListener() {
@Override
public void inProgress(final float progress) {
mHandler.post(new Runnable() {
@Override
public void run() {
subscriber.inProgress(progress);
}
});
}
});
return dest;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Maybe.concatDelayError(Arrays.asList(cache,remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
})
.firstElement();
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Observable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type, backpressureStrategy)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public Observable<R> apply(@NonNull Observable<T> input) {
return input.publish(
new Function<Observable<T>, Observable<R>>() {
@Override
public Observable<R> apply(Observable<T> innerInput) throws Throwable {
final List<Observable<R>> transformed = new ArrayList<>();
for (ObservableTransformer<T, R> transformer : transformers) {
transformed.add(innerInput.compose(transformer));
}
return Observable.merge(transformed);
}
});
}