下面列出了怎么用io.reactivex.observables.GroupedObservable的API类实例代码及写法,或者点击链接到github查看源代码。
private static ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult> splitByAddressAndForEach(
final ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult> compose
) {
return new ObservableTransformer<RxBleInternalScanResult, RxBleInternalScanResult>() {
@Override
public Observable<RxBleInternalScanResult> apply(Observable<RxBleInternalScanResult> observable) {
return observable
.groupBy(new Function<RxBleInternalScanResult, String>() {
@Override
public String apply(RxBleInternalScanResult rxBleInternalScanResult) {
return rxBleInternalScanResult.getBluetoothDevice().getAddress();
}
})
.flatMap(new Function<GroupedObservable<String, RxBleInternalScanResult>, Observable<RxBleInternalScanResult>>() {
@Override
public Observable<RxBleInternalScanResult> apply(
GroupedObservable<String, RxBleInternalScanResult> groupedObservable) {
return groupedObservable.compose(compose);
}
});
}
};
}
@Test public void completeCompletesInner() {
Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertComplete();
}
@Test public void errorCompletesInner() {
RuntimeException error = new RuntimeException("boom!");
Observable<Message> messages = Observable.just( //
Notification.createOnNext(new Message("Bob", "Hello")),
Notification.createOnError(error)
).dematerialize();
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertError(error);
}
@Test public void splits() {
Observable<Message> messages = Observable.just( //
new Message("Bob", "Hello"), //
new Message("Bob", "World"), //
new Message("Alice", "Hey"), //
new Message("Bob", "What's"), //
new Message("Bob", "Up?"), //
new Message("Eve", "Hey") //
);
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.createOnNext("1 Bob World"), //
Notification.<String>createOnComplete(), //
Notification.createOnNext("2 Alice Hey"), //
Notification.<String>createOnComplete(), //
Notification.createOnNext("3 Bob What's"), //
Notification.createOnNext("3 Bob Up?"), //
Notification.<String>createOnComplete(), //
Notification.createOnNext("4 Eve Hey"), //
Notification.<String>createOnComplete()); //
}
@Override protected void subscribeActual(Observer<? super GroupedObservable<K, T>> observer) {
upstream.subscribe(new WindowIfChangedObserver<>(keySelector, observer));
}
WindowIfChangedObserver(Function<? super T, ? extends K> keySelector,
Observer<? super GroupedObservable<K, T>> observer) {
this.keySelector = keySelector;
this.observer = observer;
}
public static <T, K> Observable<GroupedObservable<K, T>> create(Observable<T> upstream,
Function<? super T, ? extends K> keySelector) {
return new WindowIfChangedObservable<>(upstream, keySelector);
}