类io.reactivex.observables.GroupedObservable源码实例Demo

下面列出了怎么用io.reactivex.observables.GroupedObservable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxAndroidBle   文件: ScanSettingsEmulator.java
private static ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult> splitByAddressAndForEach(
        final ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult> compose
) {
    return new ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult>() {
        @Override
        public Observable<RxBleInternalScanResult> apply(Observable<RxBleInternalScanResult> observable) {
            return observable
                    .groupBy(new Function<RxBleInternalScanResult, String>() {
                        @Override
                        public String apply(RxBleInternalScanResult rxBleInternalScanResult) {
                            return rxBleInternalScanResult.getBluetoothDevice().getAddress();
                        }
                    })
                    .flatMap(new Function<GroupedObservable<String, RxBleInternalScanResult>, Observable<RxBleInternalScanResult>>() {
                        @Override
                        public Observable<RxBleInternalScanResult> apply(
                                GroupedObservable<String, RxBleInternalScanResult> groupedObservable) {
                            return groupedObservable.compose(compose);
                        }
                    });
        }
    };
}
 
源代码2 项目: RxWindowIfChanged   文件: WindowIfChangedTest.java
@Test public void completeCompletesInner() {
  Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
  final AtomicInteger seen = new AtomicInteger();
  WindowIfChanged.create(messages, userSelector)
      .switchMap(
          new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
            @Override public Observable<Notification<String>> apply(
                GroupedObservable<String, Message> group) {
              final int count = seen.incrementAndGet();
              return group.map(new Function<Message, String>() {
                @Override public String apply(Message message) throws Exception {
                  return count + " " + message;
                }
              }).materialize();
            }
          })
      .test()
      .assertValues( //
          Notification.createOnNext("1 Bob Hello"), //
          Notification.<String>createOnComplete()) //
      .assertComplete();
}
 
源代码3 项目: RxWindowIfChanged   文件: WindowIfChangedTest.java
@Test public void errorCompletesInner() {
  RuntimeException error = new RuntimeException("boom!");
  Observable<Message> messages = Observable.just( //
      Notification.createOnNext(new Message("Bob", "Hello")),
      Notification.createOnError(error)
  ).dematerialize();
  final AtomicInteger seen = new AtomicInteger();
  WindowIfChanged.create(messages, userSelector)
      .switchMap(
          new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
            @Override public Observable<Notification<String>> apply(
                GroupedObservable<String, Message> group) {
              final int count = seen.incrementAndGet();
              return group.map(new Function<Message, String>() {
                @Override public String apply(Message message) throws Exception {
                  return count + " " + message;
                }
              }).materialize();
            }
          })
      .test()
      .assertValues( //
          Notification.createOnNext("1 Bob Hello"), //
          Notification.<String>createOnComplete()) //
      .assertError(error);
}
 
源代码4 项目: RxWindowIfChanged   文件: WindowIfChangedTest.java
@Test public void splits() {
  Observable<Message> messages = Observable.just( //
      new Message("Bob", "Hello"), //
      new Message("Bob", "World"), //
      new Message("Alice", "Hey"), //
      new Message("Bob", "What's"), //
      new Message("Bob", "Up?"), //
      new Message("Eve", "Hey") //
  );
  final AtomicInteger seen = new AtomicInteger();
  WindowIfChanged.create(messages, userSelector)
      .switchMap(
          new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
            @Override public Observable<Notification<String>> apply(
                GroupedObservable<String, Message> group) {
              final int count = seen.incrementAndGet();
              return group.map(new Function<Message, String>() {
                @Override public String apply(Message message) throws Exception {
                  return count + " " + message;
                }
              }).materialize();
            }
          })
      .test()
      .assertValues( //
          Notification.createOnNext("1 Bob Hello"), //
          Notification.createOnNext("1 Bob World"), //
          Notification.<String>createOnComplete(), //
          Notification.createOnNext("2 Alice Hey"), //
          Notification.<String>createOnComplete(), //
          Notification.createOnNext("3 Bob What's"), //
          Notification.createOnNext("3 Bob Up?"), //
          Notification.<String>createOnComplete(), //
          Notification.createOnNext("4 Eve Hey"), //
          Notification.<String>createOnComplete()); //
}
 
@Override protected void subscribeActual(Observer<? super GroupedObservable<K, T>> observer) {
  upstream.subscribe(new WindowIfChangedObserver<>(keySelector, observer));
}
 
WindowIfChangedObserver(Function<? super T, ? extends K> keySelector,
    Observer<? super GroupedObservable<K, T>> observer) {
  this.keySelector = keySelector;
  this.observer = observer;
}
 
源代码7 项目: RxWindowIfChanged   文件: WindowIfChanged.java
public static <T, K> Observable<GroupedObservable<K, T>> create(Observable<T> upstream,
    Function<? super T, ? extends K> keySelector) {
  return new WindowIfChangedObservable<>(upstream, keySelector);
}
 
 类所在包
 类方法
 同包方法