下面列出了怎么用io.reactivex.ObservableTransformer的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void match_closest() {
ObservableTransformer<ScanData, ScanData> serviceMatch = scanData -> scanData.filter(sd -> true);
when(serviceScanMatcher.match()).thenReturn(serviceMatch);
scanDataTestObserver = scanDataRelay
.compose(rssiScanMatcher.match())
.test();
when(scanData1.getRssi()).thenReturn(-60);
when(scanData2.getRssi()).thenReturn(-50);
scanDataRelay.accept(scanData1);
testScheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
scanDataRelay.accept(scanData2);
testScheduler.advanceTimeBy(
RssiScanMatcher.DEFAULT_MATCH_DELAY_MS + 500,
TimeUnit.MILLISECONDS);
scanDataTestObserver.assertValue(scanData2);
}
public static ObservableTransformer<TasksListEffect, TasksListEvent> createEffectHandler(
Context context,
TasksListViewActions view,
Action showAddTask,
Consumer<Task> showTaskDetails) {
TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
TasksLocalDataSource localSource =
TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
return RxMobius.<TasksListEffect, TasksListEvent>subtypeEffectHandler()
.addTransformer(RefreshTasks.class, refreshTasksHandler(remoteSource, localSource))
.addTransformer(LoadTasks.class, loadTasksHandler(localSource))
.addConsumer(SaveTask.class, saveTaskHandler(remoteSource, localSource))
.addConsumer(DeleteTasks.class, deleteTasksHandler(remoteSource, localSource))
.addConsumer(ShowFeedback.class, showFeedbackHandler(view), mainThread())
.addConsumer(
NavigateToTaskDetails.class, navigateToDetailsHandler(showTaskDetails), mainThread())
.addAction(StartTaskCreationFlow.class, showAddTask, mainThread())
.build();
}
static ObservableTransformer<RefreshTasks, TasksListEvent> refreshTasksHandler(
TasksDataSource remoteSource, TasksDataSource localSource) {
Single<TasksListEvent> refreshTasksOperation =
remoteSource
.getTasks()
.singleOrError()
.map(Either::<Throwable, List<Task>>right)
.onErrorReturn(Either::left)
.flatMap(
either ->
either.map(
left -> Single.just(tasksLoadingFailed()),
right ->
Observable.fromIterable(right.value())
.concatMapCompletable(
t -> Completable.fromAction(() -> localSource.saveTask(t)))
.andThen(Single.just(tasksRefreshed()))
.onErrorReturnItem(tasksLoadingFailed())));
return refreshTasks -> refreshTasks.flatMapSingle(__ -> refreshTasksOperation);
}
public static ObservableTransformer<TaskDetailEffect, TaskDetailEvent> createEffectHandlers(
TaskDetailViewActions view, Context context, Action dismiss, Consumer<Task> launchEditor) {
TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
TasksLocalDataSource localSource =
TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
return RxMobius.<TaskDetailEffect, TaskDetailEvent>subtypeEffectHandler()
.addFunction(DeleteTask.class, deleteTaskHandler(remoteSource, localSource))
.addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
.addAction(NotifyTaskMarkedComplete.class, view::showTaskMarkedComplete, mainThread())
.addAction(NotifyTaskMarkedActive.class, view::showTaskMarkedActive, mainThread())
.addAction(NotifyTaskDeletionFailed.class, view::showTaskDeletionFailed, mainThread())
.addAction(NotifyTaskSaveFailed.class, view::showTaskSavingFailed, mainThread())
.addConsumer(OpenTaskEditor.class, openTaskEditorHandler(launchEditor), mainThread())
.addAction(Exit.class, dismiss, mainThread())
.build();
}
/**
* Optionally set a shared error handler in case a handler throws an uncaught exception.
*
* <p>The default is to use {@link RxJavaPlugins#onError(Throwable)}. Note that any exception
* thrown by a handler is a fatal error and this method doesn't enable safe error handling, only
* configurable crash reporting.
*
* @param function a function that gets told which sub-transformer failed and should return an
* appropriate handler for exceptions thrown.
*/
public SubtypeEffectHandlerBuilder<F, E> withFatalErrorHandler(
final Function<ObservableTransformer<? extends F, E>, Consumer<Throwable>> function) {
checkNotNull(function);
this.onErrorFunction =
new OnErrorFunction<ObservableTransformer<? extends F, E>, Consumer<Throwable>>() {
@Override
public Consumer<Throwable> apply(ObservableTransformer<? extends F, E> effectHandler) {
try {
return function.apply(effectHandler);
} catch (Exception e) {
throw new RuntimeException(
"FATAL: fatal error handler threw exception for effect handler: "
+ effectHandler,
e);
}
}
};
return this;
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* 对结果进行预处理
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
if(tBaseResponse.getErrorCode() == 0){
return createObservable(tBaseResponse.getData());//创建我们需要的数据
}
return Observable.error(
new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
);
}
});
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(final F f) {
Observable<E> eventObservable =
Observable.fromCallable(
new Callable<E>() {
@Override
public E call() throws Exception {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
@Test
public void shouldSupportStartingALoopWithAnInit() throws Exception {
MobiusLoop.Builder<String, Integer, Boolean> withInit =
builder.init(
new Init<String, Boolean>() {
@Nonnull
@Override
public First<String, Boolean> init(String model) {
return First.first(model + "-init");
}
});
ObservableTransformer<Integer, String> transformer = RxMobius.loopFrom(withInit, "hi");
final TestObserver<String> observer = Observable.just(10).compose(transformer).test();
observer.awaitCount(2);
observer.assertValues("hi-init", "hi-init10");
}
@Test
public void shouldThrowIfStartingALoopWithInitAndStartEffects() throws Exception {
MobiusLoop.Builder<String, Integer, Boolean> withInit =
builder.init(
new Init<String, Boolean>() {
@Nonnull
@Override
public First<String, Boolean> init(String model) {
return First.first(model + "-init");
}
});
ObservableTransformer<Integer, String> transformer =
RxMobius.loopFrom(withInit, "hi", effects(true));
Observable.just(10)
.compose(transformer)
.test()
.assertError(t -> t.getMessage().contains("cannot pass in start effects"));
}
@Before
public void setUp() throws Exception {
cConsumer = new TestConsumer<>();
dAction = new TestAction();
ObservableTransformer<TestEffect, TestEvent> router =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())))
.addTransformer(B.class, (Observable<B> bs) -> bs.map(b -> BEvent.create(b.id())))
.addConsumer(C.class, cConsumer)
.addAction(D.class, dAction)
.addFunction(E.class, e -> AEvent.create(e.id()))
.build();
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
publishSubject.compose(router).subscribe(testSubscriber);
}
private void prepareConnect(boolean matcherEquals) {
scanMatcher =
new ScanMatcher() {
@Override
public ObservableTransformer<ScanData, ScanData> match() {
return scanData -> scanData;
}
@Override
public boolean equals(Object o) {
return matcherEquals;
}
@Override
public int hashCode() {
return 1;
}
};
stateTestObserver = coreConnectionManager.state().test();
connectTestObserver = coreConnectionManager
.connect(scanMatcher, DEFAULT_SCAN_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT)
.test();
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
/**
* 线程调度器
*/
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnNext(new Consumer<T>() {
@Override
public void accept(T t) throws Exception {
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 封装线程调度
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T,T> rxScheduers(){
return new ObservableTransformer<T,T>(){
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 对结果进行预处理
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
return upstream ->
upstream.flatMap((Function<BaseResponse<T>, Observable<T>>) baseResponse -> {
if (baseResponse.getErrorCode() == 0) {
return createObservable(baseResponse.getData());//创建我们需要的数据
}
return Observable.error(
new ApiException(baseResponse.getErrorCode(), baseResponse.getErrorMsg())//创建一个异常
);
});
}
/**
* Take a stream of scan data, match by service uuid, wait 1500ms, then check to see if this is
* the closest (gauged by RSSI) discovered peripheral with that service uuid.
*
* @return transformed stream of matches.
*/
@Override
public ObservableTransformer<ScanData, ScanData> match() {
return scanDataStream ->
scanDataStream
.doOnSubscribe(disposable -> matches.clear())
.compose(serviceScanMatcher.match())
.doOnNext(scanData -> {
synchronized (syncRoot) {
matches.put(scanData.getBluetoothDevice().getAddress(), scanData);
}
})
.delay(matchDelayMs, TimeUnit.MILLISECONDS)
.filter(match -> {
synchronized (syncRoot) {
for (ScanData other : matches.values()) {
// Skip our self.
if (other.getBluetoothDevice().getAddress().contentEquals(
match.getBluetoothDevice().getAddress())) {
continue;
}
// If the other match is closer then do not match.
// RSSI is measured in dB; negative values, closer to zero indicates
// stronger signal.
if (other.getRssi() > match.getRssi()) {
return false;
}
}
// This is our match.
return true;
}
});
}
/**
* 拆壳
*
* @param <T>
* @return
*/
public static <T> ObservableTransformer<RespData<T>, RespData<T>> handleObservableToResult() {
return new ObservableTransformer<RespData<T>, RespData<T>>() {
@Override
public ObservableSource<RespData<T>> apply(Observable<RespData<T>> upstream) {
return upstream
.map(new TransToResult<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
private ObservableTransformer<Boolean, ScanData> scan(Scanner scanner, ScanMatcher scanMatcher) {
return bluetoothEnabled ->
bluetoothEnabled
.doOnNext(enabled -> stateRelay.accept(State.SCANNING))
.switchMap(enabled -> scanner.scan())
.compose(scanMatcher.match())
.firstOrError()
.timeout(
scanTimeoutMs,
TimeUnit.MILLISECONDS,
Single.error(new ConnectionError(SCAN_TIMEOUT)))
.toObservable();
}
public static MobiusLoop.Controller<TasksListModel, TasksListEvent> createController(
ObservableTransformer<TasksListEffect, TasksListEvent> effectHandler,
EventSource<TasksListEvent> eventSource,
TasksListModel defaultModel) {
return MobiusAndroid.controller(createLoop(eventSource, effectHandler), defaultModel);
}
@Test
public void match_none() {
ObservableTransformer<ScanData, ScanData> serviceMatch = scanData -> scanData.filter(sd -> false);
when(serviceScanMatcher.match()).thenReturn(serviceMatch);
scanDataTestObserver = scanDataRelay
.compose(rssiScanMatcher.match())
.test();
scanDataRelay.accept(scanData1);
scanDataRelay.accept(scanData2);
scanDataTestObserver.assertEmpty();
}
private ObservableTransformer<Peripheral, Peripheral> shareConnection() {
return peripheral -> peripheral
.doOnNext(connectablePeripheral -> stateRelay.accept(State.CONNECTED))
.doOnDispose(() -> stateRelay.accept(State.DISCONNECTED))
.doOnError(error -> stateRelay.accept(State.DISCONNECTED_WITH_ERROR))
.doFinally(() -> {
this.sharedPeripheralObservable = null;
this.scanMatcher = null;
})
.replay(1)
.refCount();
}
public static ObservableTransformer<AddEditTaskEffect, AddEditTaskEvent> createEffectHandlers(
Context context, Action showTasksList, Action showEmptyTaskError) {
TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
TasksLocalDataSource localSource =
TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
return RxMobius.<AddEditTaskEffect, AddEditTaskEvent>subtypeEffectHandler()
.addAction(NotifyEmptyTaskNotAllowed.class, showEmptyTaskError, mainThread())
.addAction(Exit.class, showTasksList, mainThread())
.addFunction(CreateTask.class, createTaskHandler(remoteSource, localSource))
.addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
.build();
}
public static ObservableTransformer<StatisticsEffect, StatisticsEvent> createEffectHandler(
Context context) {
TasksLocalDataSource localSource =
TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
return RxMobius.<StatisticsEffect, StatisticsEvent>subtypeEffectHandler()
.addTransformer(LoadTasks.class, loadTasksHandler(localSource))
.build();
}
private static ObservableTransformer<LoadTasks, StatisticsEvent> loadTasksHandler(
TasksLocalDataSource localSource) {
return effects ->
effects.flatMap(
loadTasks ->
localSource
.getTasks()
.toObservable()
.take(1)
.map(ImmutableList::copyOf)
.map(StatisticsEvent::tasksLoaded)
.onErrorReturnItem(tasksLoadingFailed()));
}
@Test
public void callsBackendThenSavesThingsToLocalDataSource() {
FakeTasksRemoteDataSource remoteSource = FakeTasksRemoteDataSource.getInstance();
FakeDataSource localSource = new FakeDataSource();
ObservableTransformer<RefreshTasks, TasksListEvent> underTest =
refreshTasksHandler(remoteSource, localSource);
TestCase<RefreshTasks, TasksListEvent> testCase = new TestCase<>(underTest);
testCase.dispatchEffect(refreshTasks().asRefreshTasks());
assertThat(localSource.tasks, contains(TASK_1));
testCase.assertEvents(TasksListEvent.tasksRefreshed());
}
@Test
public void failureToStoreTasksResultsInErrorEvent() {
FakeDataSource localSource = new FakeDataSource();
localSource.fail = true;
ObservableTransformer<RefreshTasks, TasksListEvent> underTest =
refreshTasksHandler(FakeTasksRemoteDataSource.getInstance(), localSource);
TestCase<RefreshTasks, TasksListEvent> testCase = new TestCase<>(underTest);
testCase.dispatchEffect(refreshTasks().asRefreshTasks());
assertThat(localSource.tasks.isEmpty(), is(true));
testCase.assertEvents(tasksLoadingFailed());
}
/**
* 网络请求转换器
*
* 执行在IO线程
* 回调到Main线程
* 添加错误处理器
*/
public static <T> ObservableTransformer<T, T> network() {
return observable ->
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext((Function<Throwable, Observable<T>>) t -> {
return Observable.error(ResponseErrorHandle.handleError(t));
});
}
@Override
public Observable<R> apply(Observable<T> input) {
return input.publish(
new Function<Observable<T>, Observable<R>>() {
@Override
public Observable<R> apply(Observable<T> innerInput) {
final List<Observable<R>> transformed = new ArrayList<>();
for (ObservableTransformer<T, R> transformer : transformers) {
transformed.add(innerInput.compose(transformer));
}
return Observable.merge(transformed);
}
});
}
/**
* Add an {@link ObservableTransformer} for handling effects of a given type. The handler will
* receive all effect objects that extend the given class.
*
* <p>Adding handlers for two effect classes where one is a super-class of the other is
* considered a collision and is not allowed. Registering the same class twice is also
* considered a collision.
*
* @param effectClass the class to handle
* @param effectHandler the effect handler for the given effect class
* @param <G> the effect class as a type parameter
* @return this builder
* @throws IllegalArgumentException if there is a handler collision
*/
public <G extends F> SubtypeEffectHandlerBuilder<F, E> addTransformer(
final Class<G> effectClass, final ObservableTransformer<G, E> effectHandler) {
//noinspection ResultOfMethodCallIgnored
checkNotNull(effectClass);
//noinspection ResultOfMethodCallIgnored
checkNotNull(effectHandler);
for (Class<?> cls : effectPerformerMap.keySet()) {
if (cls.isAssignableFrom(effectClass) || effectClass.isAssignableFrom(cls)) {
throw new IllegalArgumentException(
"Effect classes may not be assignable to each other, collision found: "
+ effectClass.getSimpleName()
+ " <-> "
+ cls.getSimpleName());
}
}
effectPerformerMap.put(
effectClass,
new ObservableTransformer<F, E>() {
@Override
public Observable<E> apply(Observable<F> effects) {
return effects
.ofType(effectClass)
.compose(effectHandler)
.doOnError(onErrorFunction.apply(effectHandler));
}
});
return this;
}