下面列出了怎么用io.reactivex.functions.Action的API类实例代码及写法,或者点击链接到github查看源代码。
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
void reset() {
hasCachedResults = false;
this.deviceServicesObservable = getListOfServicesFromGatt()
.map(wrapIntoRxBleDeviceServices())
.switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
.doOnSuccess(Functions.actionConsumer(new Action() {
@Override
public void run() {
hasCachedResults = true;
}
}))
.doOnError(Functions.actionConsumer(new Action() {
@Override
public void run() {
reset();
}
}))
.cache();
}
private <T> FlowableTransformer<T, T> autoClose(final QueryResultIterable<T> iterable) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
// Stream terminates (completed or on error): close the cursor
iterable.close();
}
}).doOnCancel(new Action() {
@Override
public void run() throws Exception {
// Cancelled subscription (manual unsubscribe or via some operator such as take()): close the cursor
iterable.close();
}
});
}
};
}
private Observable<List<Story>> getAndSaveStoryListFromRemote(String date) {
return this.mStoriesRemoteDataSource.getStories(date)
.doOnNext(new Consumer<List<Story>>() {
@Override
public void accept(@io.reactivex.annotations.NonNull List<Story> storyList) throws Exception {
mStoriesLocalDataSource.saveStories(storyList);
for (Story story : storyList) {
mCachedStories.put(story.getId(), story);
}
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
mCacheIsDirty = true;
}
});
}
@Override
public ObservableSource<RxBleConnection> apply(Observable<RxBleConnection> upstream) {
synchronized (connectionObservable) {
final Observable<RxBleConnection> rxBleConnectionObservable = connectionObservable.get();
if (rxBleConnectionObservable != null) {
return rxBleConnectionObservable;
}
final Observable<RxBleConnection> newConnectionObservable = upstream
.doFinally(new Action() {
@Override
public void run() {
connectionObservable.set(null);
}
})
.replay(1)
.refCount();
connectionObservable.set(newConnectionObservable);
return newConnectionObservable;
}
}
public static ObservableTransformer<TaskDetailEffect, TaskDetailEvent> createEffectHandlers(
TaskDetailViewActions view, Context context, Action dismiss, Consumer<Task> launchEditor) {
TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
TasksLocalDataSource localSource =
TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
return RxMobius.<TaskDetailEffect, TaskDetailEvent>subtypeEffectHandler()
.addFunction(DeleteTask.class, deleteTaskHandler(remoteSource, localSource))
.addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
.addAction(NotifyTaskMarkedComplete.class, view::showTaskMarkedComplete, mainThread())
.addAction(NotifyTaskMarkedActive.class, view::showTaskMarkedActive, mainThread())
.addAction(NotifyTaskDeletionFailed.class, view::showTaskDeletionFailed, mainThread())
.addAction(NotifyTaskSaveFailed.class, view::showTaskSavingFailed, mainThread())
.addConsumer(OpenTaskEditor.class, openTaskEditorHandler(launchEditor), mainThread())
.addAction(Exit.class, dismiss, mainThread())
.build();
}
public static Completable completableExec(final RealmConfiguration configuration,
final Consumer<Realm> transaction) {
return Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
try (Realm realm = Realm.getInstance(configuration)) {
realm.executeTransaction(new Transaction() {
@Override
public void execute(Realm r) {
try {
transaction.accept(r);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
});
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) throws Exception {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Exception {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Save image with RxJava2
*
* @param bitmap Bitmap for saving
* @param saveUri Uri for saving the cropped image
*
* @return Single of saving image
*/
public Single<Uri> saveAsSingle(final Bitmap bitmap, final Uri saveUri) {
return Single.fromCallable(new Callable<Uri>() {
@Override public Uri call() throws Exception {
return saveImage(bitmap, saveUri);
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(@NonNull Disposable disposable) throws Exception {
mIsSaving.set(true);
}
}).doFinally(new Action() {
@Override public void run() throws Exception {
mIsSaving.set(false);
}
});
}
/**
* 倒计时
*
* @param view 倒计时所用到的view
* @param second 倒计时时长 单位 秒
* @param listener 倒计时回调
* @param <T>
* @return Disposable 返回 Disposable 在Activity的onDestroy方法中
* disposable.dispose() 取消掉 防止内存泄漏
* @see CountDownListener 回调接口
*/
public static <T extends View> Disposable countDown(final T view, @IntRange(from = 1) final int second, final CountDownListener<T> listener) {
if (listener == null || second <= 0) return null;
return Flowable.intervalRange(0, second + 1, 0, 1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
listener.onCountDownProgress(view, (int) (second - aLong));
}
}).doOnComplete(new Action() {
@Override
public void run() throws Exception {
listener.onCountDownComplete(view);
}
}).doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
listener.onBindCountDown(view);
}
}).subscribe();
}
/**
* 让在前台的 {@link Activity}, 使用 {@link Snackbar} 显示文本内容
*
* @param message
* @param isLong
*/
public void showSnackbar(String message, boolean isLong) {
if (getCurrentActivity() == null && getTopActivity() == null) {
Log.d(TAG, "mCurrentActivity == null when showSnackbar(String,boolean)");
return;
}
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
if (DEPENDENCY_SUPPORT_DESIGN) {
Activity activity = getCurrentActivity() == null ? getTopActivity() : getCurrentActivity();
View view = activity.getWindow().getDecorView().findViewById(android.R.id.content);
Snackbar.make(view, message, isLong ? Snackbar.LENGTH_LONG : Snackbar.LENGTH_SHORT).show();
} else {
Toast.makeText(mApplication, message, Toast.LENGTH_SHORT).show();
}
}
}).subscribeOn(AndroidSchedulers.mainThread()).subscribe();
}
public void doIt(
Completable flowable,
Action complete,
Consumer<? super Throwable> error
) {
long current = disposableAddress.size();
Disposable subscription = flowable.
timeout(DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.doOnError(throwable -> {
remove(current);
})
.doOnComplete(() -> {
remove(current);
})
.subscribe(complete, error);
disposableAddress.append(current, subscription);
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
/**
* Crop image with RxJava2
*
* @param sourceUri Uri for cropping(If null, the Uri set in loadAsSingle() is used)
*
* @return Single of cropping image
*/
public Single<Bitmap> cropAsSingle(final Uri sourceUri) {
return Single.fromCallable(new Callable<Bitmap>() {
@Override public Bitmap call() throws Exception {
if (sourceUri != null) mSourceUri = sourceUri;
return cropImage();
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(@NonNull Disposable disposable) throws Exception {
mIsCropping.set(true);
}
}).doFinally(new Action() {
@Override public void run() throws Exception {
mIsCropping.set(false);
}
});
}
@Override public Completable write(@NonNull final String name, @Nullable final ByteString value) {
return Completable.fromAction(new Action() {
@Override public void run() throws Exception {
checkCanStoreSecurely();
synchronized (dataLock) {
if (value == null) {
storage.remove(name);
return;
}
prepareKeyStore();
Cipher cipher = createCipher();
cipher.init(Cipher.ENCRYPT_MODE, getPublicKey());
storage.put(name, ByteString.of(cipher.doFinal(value.toByteArray())));
}
}
});
}
/**
* @param emitter
*/
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
@Override
public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
if (!emitter.isDisposed()) {
emitter.onNext(firebaseAuth);
}
}
};
instance.addAuthStateListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
instance.removeAuthStateListener(listener);
}
}));
}
public static <T> ObservableTransformer<T, T> applySchedulersWithLifeCycle(IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
})
.compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
/**
* 语音合成后伴随后续动作
**/
private void executeAfterSpeak(String preText, final String afterText, final int type, SpeechMsgBuilder msgBuilder) {
msgBuilder.setText(preText).setForceLocalEngine(true);
SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
if (EventBus.getDefault().hasSubscriberForEvent(NavigateEvent.class)) {
EventBus.getDefault().post(new NavigateEvent(type));
} else {
SynthesizerBase.get().startSpeakAbsolute(SpeechMsgBuilder.create(afterText).build())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(afterText), null, null, null));
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(preText), null, null, null));
}
/**
* 合成文本并提前返回
**/
private void speakAndAheadReturn(String text, SpeechMsgBuilder msgBuilder) {
/* 将回复文本发送到聊天列表 */
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
/* 合成回复文本 */
msgBuilder.setText(text).setForceLocalEngine(true);
SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new NavigateEvent(NavigateEvent.START_NAVI));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
@Override
protected void protectedRun(final ObservableEmitter<BluetoothGatt> emitter, final QueueReleaseInterface queueReleaseInterface) {
final Action queueReleaseAction = new Action() {
@Override
public void run() {
queueReleaseInterface.release();
}
};
final DisposableSingleObserver<BluetoothGatt> disposableGattObserver = getConnectedBluetoothGatt()
.compose(wrapWithTimeoutWhenNotAutoconnecting())
// when there are no subscribers there is no point of continuing work -> next will be disconnect operation
.doFinally(queueReleaseAction)
.subscribeWith(disposableSingleObserverFromEmitter(emitter));
emitter.setDisposable(disposableGattObserver);
if (autoConnect) {
// with autoConnect the connection may be established after a really long time
queueReleaseInterface.release();
}
}
/**
* 发送回复文本文本视图并合成声音
**/
private void showAndSpeak(SpeechMsgBuilder builder) {
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(builder.getText()), null, null, null));
IflySynthesizer.getInstance().startSpeakAbsolute(builder.build())
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
public void test3(){
start3(new Vo(10)).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
}).doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnCompleted......."+Thread.currentThread());
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnTerminate......."+Thread.currentThread());
}
}).subscribe();
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Observable<RxBleConnection> establishConnection(final ConnectionSetup options) {
return Observable.defer(new Callable<ObservableSource<RxBleConnection>>() {
@Override
public ObservableSource<RxBleConnection> call() {
if (isConnected.compareAndSet(false, true)) {
return connector.prepareConnection(options)
.doFinally(new Action() {
@Override
public void run() {
isConnected.set(false);
}
});
} else {
return Observable.error(new BleAlreadyConnectedException(bluetoothDevice.getAddress()));
}
}
});
}
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
public static <T> ObservableTransformer<T, T> io_main() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> ObservableTransformer<ApiResult<T>, T> _io_main() {
return new ObservableTransformer<ApiResult<T>, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new HandleFuc<T>())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.onErrorResumeNext(new HttpResponseFunc<T>());
}
};
}
NonBlockingPool(Callable<? extends T> factory, Predicate<? super T> healthCheck, Consumer<? super T> disposer,
int maxSize, long idleTimeBeforeHealthCheckMs, long maxIdleTimeMs, long createRetryIntervalMs,
BiFunction<? super T, ? super Checkin, ? extends T> checkinDecorator, Scheduler scheduler,
Action closeAction) {
Preconditions.checkNotNull(factory);
Preconditions.checkNotNull(healthCheck);
Preconditions.checkNotNull(disposer);
Preconditions.checkArgument(maxSize > 0);
Preconditions.checkNotNull(checkinDecorator);
Preconditions.checkNotNull(scheduler);
Preconditions.checkArgument(createRetryIntervalMs >= 0, "createRetryIntervalMs must be >=0");
Preconditions.checkNotNull(closeAction);
Preconditions.checkArgument(maxIdleTimeMs >= 0, "maxIdleTime must be >=0");
this.factory = factory;
this.healthCheck = healthCheck;
this.disposer = disposer;
this.maxSize = maxSize;
this.idleTimeBeforeHealthCheckMs = idleTimeBeforeHealthCheckMs;
this.maxIdleTimeMs = maxIdleTimeMs;
this.createRetryIntervalMs = createRetryIntervalMs;
this.checkinDecorator = checkinDecorator;
this.scheduler = scheduler;// schedules retries
this.closeAction = closeAction;
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view,final boolean isLoadMore) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
if (!isLoadMore){
view.showLoading();
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}