类io.reactivex.ObservableTransformer源码实例Demo

下面列出了怎么用io.reactivex.ObservableTransformer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxCentralBle   文件: RssiScanMatcherTest.java
@Test
public void match_closest() {
  ObservableTransformer<ScanData, ScanData> serviceMatch = scanData -> scanData.filter(sd -> true);
  when(serviceScanMatcher.match()).thenReturn(serviceMatch);

  scanDataTestObserver = scanDataRelay
          .compose(rssiScanMatcher.match())
          .test();

  when(scanData1.getRssi()).thenReturn(-60);
  when(scanData2.getRssi()).thenReturn(-50);

  scanDataRelay.accept(scanData1);

  testScheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

  scanDataRelay.accept(scanData2);

  testScheduler.advanceTimeBy(
          RssiScanMatcher.DEFAULT_MATCH_DELAY_MS + 500,
          TimeUnit.MILLISECONDS);

  scanDataTestObserver.assertValue(scanData2);
}
 
public static ObservableTransformer<TasksListEffect, TasksListEvent> createEffectHandler(
    Context context,
    TasksListViewActions view,
    Action showAddTask,
    Consumer<Task> showTaskDetails) {

  TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
  TasksLocalDataSource localSource =
      TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());

  return RxMobius.<TasksListEffect, TasksListEvent>subtypeEffectHandler()
      .addTransformer(RefreshTasks.class, refreshTasksHandler(remoteSource, localSource))
      .addTransformer(LoadTasks.class, loadTasksHandler(localSource))
      .addConsumer(SaveTask.class, saveTaskHandler(remoteSource, localSource))
      .addConsumer(DeleteTasks.class, deleteTasksHandler(remoteSource, localSource))
      .addConsumer(ShowFeedback.class, showFeedbackHandler(view), mainThread())
      .addConsumer(
          NavigateToTaskDetails.class, navigateToDetailsHandler(showTaskDetails), mainThread())
      .addAction(StartTaskCreationFlow.class, showAddTask, mainThread())
      .build();
}
 
static ObservableTransformer<RefreshTasks, TasksListEvent> refreshTasksHandler(
    TasksDataSource remoteSource, TasksDataSource localSource) {
  Single<TasksListEvent> refreshTasksOperation =
      remoteSource
          .getTasks()
          .singleOrError()
          .map(Either::<Throwable, List<Task>>right)
          .onErrorReturn(Either::left)
          .flatMap(
              either ->
                  either.map(
                      left -> Single.just(tasksLoadingFailed()),
                      right ->
                          Observable.fromIterable(right.value())
                              .concatMapCompletable(
                                  t -> Completable.fromAction(() -> localSource.saveTask(t)))
                              .andThen(Single.just(tasksRefreshed()))
                              .onErrorReturnItem(tasksLoadingFailed())));

  return refreshTasks -> refreshTasks.flatMapSingle(__ -> refreshTasksOperation);
}
 
public static ObservableTransformer<TaskDetailEffect, TaskDetailEvent> createEffectHandlers(
    TaskDetailViewActions view, Context context, Action dismiss, Consumer<Task> launchEditor) {

  TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
  TasksLocalDataSource localSource =
      TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
  return RxMobius.<TaskDetailEffect, TaskDetailEvent>subtypeEffectHandler()
      .addFunction(DeleteTask.class, deleteTaskHandler(remoteSource, localSource))
      .addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
      .addAction(NotifyTaskMarkedComplete.class, view::showTaskMarkedComplete, mainThread())
      .addAction(NotifyTaskMarkedActive.class, view::showTaskMarkedActive, mainThread())
      .addAction(NotifyTaskDeletionFailed.class, view::showTaskDeletionFailed, mainThread())
      .addAction(NotifyTaskSaveFailed.class, view::showTaskSavingFailed, mainThread())
      .addConsumer(OpenTaskEditor.class, openTaskEditorHandler(launchEditor), mainThread())
      .addAction(Exit.class, dismiss, mainThread())
      .build();
}
 
源代码5 项目: mobius   文件: RxMobius.java
/**
 * Optionally set a shared error handler in case a handler throws an uncaught exception.
 *
 * <p>The default is to use {@link RxJavaPlugins#onError(Throwable)}. Note that any exception
 * thrown by a handler is a fatal error and this method doesn't enable safe error handling, only
 * configurable crash reporting.
 *
 * @param function a function that gets told which sub-transformer failed and should return an
 *     appropriate handler for exceptions thrown.
 */
public SubtypeEffectHandlerBuilder<F, E> withFatalErrorHandler(
    final Function<ObservableTransformer<? extends F, E>, Consumer<Throwable>> function) {
  checkNotNull(function);

  this.onErrorFunction =
      new OnErrorFunction<ObservableTransformer<? extends F, E>, Consumer<Throwable>>() {
        @Override
        public Consumer<Throwable> apply(ObservableTransformer<? extends F, E> effectHandler) {
          try {
            return function.apply(effectHandler);
          } catch (Exception e) {
            throw new RuntimeException(
                "FATAL: fatal error handler threw exception for effect handler: "
                    + effectHandler,
                e);
          }
        }
      };

  return this;
}
 
源代码6 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码7 项目: WanAndroid   文件: RxUtils.java
/**
 * 对结果进行预处理
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
                    if(tBaseResponse.getErrorCode() == 0){
                        return createObservable(tBaseResponse.getData());//创建我们需要的数据
                    }
                    return Observable.error(
                            new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
                    );
                }
            });
        }
    };
}
 
源代码8 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
 * the stream as an {@link Observable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the function on the specified scheduler, and passing it
 * the requested effect object then emitting its returned value.
 *
 * @param function the {@link Function} to be invoked every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the function
 * @param <F> the type of Effect this transformer handles
 * @param <E> the type of Event this transformer emits
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromFunction(
    final Function<F, E> function, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream.flatMap(
          new Function<F, ObservableSource<E>>() {
            @Override
            public ObservableSource<E> apply(final F f) {
              Observable<E> eventObservable =
                  Observable.fromCallable(
                      new Callable<E>() {
                        @Override
                        public E call() throws Exception {
                          return function.apply(f);
                        }
                      });
              return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
            }
          });
    }
  };
}
 
源代码9 项目: mobius   文件: RxMobiusLoopTest.java
@Test
public void shouldSupportStartingALoopWithAnInit() throws Exception {
  MobiusLoop.Builder<String, Integer, Boolean> withInit =
      builder.init(
          new Init<String, Boolean>() {
            @Nonnull
            @Override
            public First<String, Boolean> init(String model) {
              return First.first(model + "-init");
            }
          });

  ObservableTransformer<Integer, String> transformer = RxMobius.loopFrom(withInit, "hi");

  final TestObserver<String> observer = Observable.just(10).compose(transformer).test();

  observer.awaitCount(2);
  observer.assertValues("hi-init", "hi-init10");
}
 
源代码10 项目: mobius   文件: RxMobiusLoopTest.java
@Test
public void shouldThrowIfStartingALoopWithInitAndStartEffects() throws Exception {
  MobiusLoop.Builder<String, Integer, Boolean> withInit =
      builder.init(
          new Init<String, Boolean>() {
            @Nonnull
            @Override
            public First<String, Boolean> init(String model) {
              return First.first(model + "-init");
            }
          });

  ObservableTransformer<Integer, String> transformer =
      RxMobius.loopFrom(withInit, "hi", effects(true));

  Observable.just(10)
      .compose(transformer)
      .test()
      .assertError(t -> t.getMessage().contains("cannot pass in start effects"));
}
 
源代码11 项目: mobius   文件: MobiusEffectRouterTest.java
@Before
public void setUp() throws Exception {
  cConsumer = new TestConsumer<>();
  dAction = new TestAction();

  ObservableTransformer<TestEffect, TestEvent> router =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())))
          .addTransformer(B.class, (Observable<B> bs) -> bs.map(b -> BEvent.create(b.id())))
          .addConsumer(C.class, cConsumer)
          .addAction(D.class, dAction)
          .addFunction(E.class, e -> AEvent.create(e.id()))
          .build();

  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  publishSubject.compose(router).subscribe(testSubscriber);
}
 
源代码12 项目: RxCentralBle   文件: CoreConnectionManagerTest.java
private void prepareConnect(boolean matcherEquals) {
  scanMatcher =
      new ScanMatcher() {
        @Override
        public ObservableTransformer<ScanData, ScanData> match() {
          return scanData -> scanData;
        }

        @Override
        public boolean equals(Object o) {
          return matcherEquals;
        }

        @Override
        public int hashCode() {
          return 1;
        }
      };

  stateTestObserver = coreConnectionManager.state().test();
  connectTestObserver = coreConnectionManager
      .connect(scanMatcher, DEFAULT_SCAN_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT)
      .test();
}
 
源代码13 项目: TikTok   文件: RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doFinally(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码14 项目: TikTok   文件: RxUtils.java
/**
 * 线程调度器
 */
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<T>() {
                        @Override
                        public void accept(T t) throws Exception {

                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码15 项目: FlowHelper   文件: RxUtils.java
/**
 * 封装线程调度
 * @param <T>
 * @return
 */
public static <T> ObservableTransformer<T,T> rxScheduers(){
    return new ObservableTransformer<T,T>(){

        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码16 项目: Yuan-WanAndroid   文件: RxUtil.java
/**
 * 对结果进行预处理
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
    return upstream ->
            upstream.flatMap((Function<BaseResponse<T>, Observable<T>>) baseResponse -> {
                if (baseResponse.getErrorCode() == 0) {
                    return createObservable(baseResponse.getData());//创建我们需要的数据
                }
                return Observable.error(
                        new ApiException(baseResponse.getErrorCode(), baseResponse.getErrorMsg())//创建一个异常
                );
            });
}
 
源代码17 项目: RxCentralBle   文件: RssiScanMatcher.java
/**
 * Take a stream of scan data, match by service uuid, wait 1500ms, then check to see if this is
 * the closest (gauged by RSSI) discovered peripheral with that service uuid.
 *
 * @return transformed stream of matches.
 */
@Override
public ObservableTransformer<ScanData, ScanData> match() {
  return scanDataStream ->
          scanDataStream
                  .doOnSubscribe(disposable -> matches.clear())
                  .compose(serviceScanMatcher.match())
                  .doOnNext(scanData -> {
                    synchronized (syncRoot) {
                      matches.put(scanData.getBluetoothDevice().getAddress(), scanData);
                    }
                  })
                  .delay(matchDelayMs, TimeUnit.MILLISECONDS)
                  .filter(match -> {
                    synchronized (syncRoot) {
                      for (ScanData other : matches.values()) {
                        // Skip our self.
                        if (other.getBluetoothDevice().getAddress().contentEquals(
                                match.getBluetoothDevice().getAddress())) {
                          continue;
                        }

                        // If the other match is closer then do not match.
                        // RSSI is measured in dB; negative values, closer to zero indicates
                        // stronger signal.
                        if (other.getRssi() > match.getRssi()) {
                          return false;
                        }
                      }

                      // This is our match.
                      return true;
                    }
                  });
}
 
源代码18 项目: tysq-android   文件: RxParser.java
/**
 * 拆壳
 *
 * @param <T>
 * @return
 */
public static <T> ObservableTransformer<RespData<T>, RespData<T>> handleObservableToResult() {
    return new ObservableTransformer<RespData<T>, RespData<T>>() {
        @Override
        public ObservableSource<RespData<T>> apply(Observable<RespData<T>> upstream) {
            return upstream
                    .map(new TransToResult<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码19 项目: RxCentralBle   文件: CoreConnectionManager.java
private ObservableTransformer<Boolean, ScanData> scan(Scanner scanner, ScanMatcher scanMatcher) {
  return bluetoothEnabled ->
          bluetoothEnabled
                  .doOnNext(enabled -> stateRelay.accept(State.SCANNING))
                  .switchMap(enabled -> scanner.scan())
                  .compose(scanMatcher.match())
                  .firstOrError()
                  .timeout(
                          scanTimeoutMs,
                          TimeUnit.MILLISECONDS,
                          Single.error(new ConnectionError(SCAN_TIMEOUT)))
                  .toObservable();
}
 
源代码20 项目: mobius-android-sample   文件: TasksInjector.java
public static MobiusLoop.Controller<TasksListModel, TasksListEvent> createController(
    ObservableTransformer<TasksListEffect, TasksListEvent> effectHandler,
    EventSource<TasksListEvent> eventSource,
    TasksListModel defaultModel) {

  return MobiusAndroid.controller(createLoop(eventSource, effectHandler), defaultModel);
}
 
源代码21 项目: RxCentralBle   文件: RssiScanMatcherTest.java
@Test
public void match_none() {
  ObservableTransformer<ScanData, ScanData> serviceMatch = scanData -> scanData.filter(sd -> false);
  when(serviceScanMatcher.match()).thenReturn(serviceMatch);

  scanDataTestObserver = scanDataRelay
          .compose(rssiScanMatcher.match())
          .test();

  scanDataRelay.accept(scanData1);
  scanDataRelay.accept(scanData2);
  scanDataTestObserver.assertEmpty();
}
 
源代码22 项目: RxCentralBle   文件: CoreConnectionManager.java
private ObservableTransformer<Peripheral, Peripheral> shareConnection() {
  return peripheral -> peripheral
          .doOnNext(connectablePeripheral -> stateRelay.accept(State.CONNECTED))
          .doOnDispose(() -> stateRelay.accept(State.DISCONNECTED))
          .doOnError(error -> stateRelay.accept(State.DISCONNECTED_WITH_ERROR))
          .doFinally(() -> {
            this.sharedPeripheralObservable = null;
            this.scanMatcher = null;
          })
          .replay(1)
          .refCount();
}
 
public static ObservableTransformer<AddEditTaskEffect, AddEditTaskEvent> createEffectHandlers(
    Context context, Action showTasksList, Action showEmptyTaskError) {
  TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
  TasksLocalDataSource localSource =
      TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());

  return RxMobius.<AddEditTaskEffect, AddEditTaskEvent>subtypeEffectHandler()
      .addAction(NotifyEmptyTaskNotAllowed.class, showEmptyTaskError, mainThread())
      .addAction(Exit.class, showTasksList, mainThread())
      .addFunction(CreateTask.class, createTaskHandler(remoteSource, localSource))
      .addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
      .build();
}
 
public static ObservableTransformer<StatisticsEffect, StatisticsEvent> createEffectHandler(
    Context context) {
  TasksLocalDataSource localSource =
      TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
  return RxMobius.<StatisticsEffect, StatisticsEvent>subtypeEffectHandler()
      .addTransformer(LoadTasks.class, loadTasksHandler(localSource))
      .build();
}
 
private static ObservableTransformer<LoadTasks, StatisticsEvent> loadTasksHandler(
    TasksLocalDataSource localSource) {
  return effects ->
      effects.flatMap(
          loadTasks ->
              localSource
                  .getTasks()
                  .toObservable()
                  .take(1)
                  .map(ImmutableList::copyOf)
                  .map(StatisticsEvent::tasksLoaded)
                  .onErrorReturnItem(tasksLoadingFailed()));
}
 
@Test
public void callsBackendThenSavesThingsToLocalDataSource() {
  FakeTasksRemoteDataSource remoteSource = FakeTasksRemoteDataSource.getInstance();
  FakeDataSource localSource = new FakeDataSource();

  ObservableTransformer<RefreshTasks, TasksListEvent> underTest =
      refreshTasksHandler(remoteSource, localSource);

  TestCase<RefreshTasks, TasksListEvent> testCase = new TestCase<>(underTest);
  testCase.dispatchEffect(refreshTasks().asRefreshTasks());

  assertThat(localSource.tasks, contains(TASK_1));
  testCase.assertEvents(TasksListEvent.tasksRefreshed());
}
 
@Test
public void failureToStoreTasksResultsInErrorEvent() {
  FakeDataSource localSource = new FakeDataSource();
  localSource.fail = true;
  ObservableTransformer<RefreshTasks, TasksListEvent> underTest =
      refreshTasksHandler(FakeTasksRemoteDataSource.getInstance(), localSource);

  TestCase<RefreshTasks, TasksListEvent> testCase = new TestCase<>(underTest);

  testCase.dispatchEffect(refreshTasks().asRefreshTasks());
  assertThat(localSource.tasks.isEmpty(), is(true));
  testCase.assertEvents(tasksLoadingFailed());
}
 
源代码28 项目: DanDanPlayForAndroid   文件: ShooterNetworkUtils.java
/**
 * 网络请求转换器
 *
 * 执行在IO线程
 * 回调到Main线程
 * 添加错误处理器
 */
public static <T> ObservableTransformer<T, T> network() {
    return observable ->
            observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorResumeNext((Function<Throwable, Observable<T>>) t -> {
                        return Observable.error(ResponseErrorHandle.handleError(t));
                    });
}
 
源代码29 项目: mobius   文件: MergedTransformer.java
@Override
public Observable<R> apply(Observable<T> input) {
  return input.publish(
      new Function<Observable<T>, Observable<R>>() {
        @Override
        public Observable<R> apply(Observable<T> innerInput) {
          final List<Observable<R>> transformed = new ArrayList<>();
          for (ObservableTransformer<T, R> transformer : transformers) {
            transformed.add(innerInput.compose(transformer));
          }
          return Observable.merge(transformed);
        }
      });
}
 
源代码30 项目: mobius   文件: RxMobius.java
/**
 * Add an {@link ObservableTransformer} for handling effects of a given type. The handler will
 * receive all effect objects that extend the given class.
 *
 * <p>Adding handlers for two effect classes where one is a super-class of the other is
 * considered a collision and is not allowed. Registering the same class twice is also
 * considered a collision.
 *
 * @param effectClass the class to handle
 * @param effectHandler the effect handler for the given effect class
 * @param <G> the effect class as a type parameter
 * @return this builder
 * @throws IllegalArgumentException if there is a handler collision
 */
public <G extends F> SubtypeEffectHandlerBuilder<F, E> addTransformer(
    final Class<G> effectClass, final ObservableTransformer<G, E> effectHandler) {
  //noinspection ResultOfMethodCallIgnored
  checkNotNull(effectClass);
  //noinspection ResultOfMethodCallIgnored
  checkNotNull(effectHandler);

  for (Class<?> cls : effectPerformerMap.keySet()) {
    if (cls.isAssignableFrom(effectClass) || effectClass.isAssignableFrom(cls)) {
      throw new IllegalArgumentException(
          "Effect classes may not be assignable to each other, collision found: "
              + effectClass.getSimpleName()
              + " <-> "
              + cls.getSimpleName());
    }
  }

  effectPerformerMap.put(
      effectClass,
      new ObservableTransformer<F, E>() {
        @Override
        public Observable<E> apply(Observable<F> effects) {
          return effects
              .ofType(effectClass)
              .compose(effectHandler)
              .doOnError(onErrorFunction.apply(effectHandler));
        }
      });

  return this;
}
 
 类所在包
 类方法
 同包方法