下面列出了怎么用rx.functions.Cancellable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void call(final Emitter<AppState> appStateEmitter) {
final AppStateListener appStateListener = new AppStateListener() {
@Override
public void onAppDidEnterForeground() {
appStateEmitter.onNext(FOREGROUND);
}
@Override
public void onAppDidEnterBackground() {
appStateEmitter.onNext(BACKGROUND);
}
};
appStateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
recognizer.removeListener(appStateListener);
recognizer.stop();
}
});
recognizer.addListener(appStateListener);
recognizer.start();
}
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
public static <E> Observable<E> toObservable(
final EventSource<E> eventSource, BackpressureMode backpressureMode) {
checkNotNull(eventSource);
checkNotNull(backpressureMode);
return Observable.create(
new Action1<Emitter<E>>() {
@Override
public void call(final Emitter<E> emitter) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
disposable.dispose();
}
});
}
},
backpressureMode);
}
/**
* Add a new cancellation while keeping the previous one(s). Note that this is different from the original contract
* defined by {@link Emitter#setCancellation(Cancellable)}.
*
* @param c will reset all cancellations and subscriptions when <tt>null</tt>
*/
@Override
public final void setCancellation(@Nullable Cancellable c) {
if (c == null) {
subscriptions.clear();
return;
}
subscriptions.add(new CancellableSubscription(c));
}
@Override
public Observable<M> call(final Observable<E> events) {
return Observable.create(
new Action1<Emitter<M>>() {
@Override
public void call(final Emitter<M> emitter) {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new Consumer<M>() {
@Override
public void accept(M newModel) {
emitter.onNext(newModel);
}
});
final Subscription eventSubscription =
events.subscribe(
new Observer<E>() {
@Override
public void onCompleted() {
// TODO: complain loudly! shouldn't ever complete
}
@Override
public void onError(Throwable e) {
emitter.onError(new UnrecoverableIncomingException(e));
}
@Override
public void onNext(E event) {
loop.dispatchEvent(event);
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
loop.dispose();
eventSubscription.unsubscribe();
}
});
}
},
Emitter.BackpressureMode.NONE);
}
public static <I, O> Observable.Transformer<I, O> toTransformer(
final Connectable<I, O> connectable, final BackpressureMode backpressureMode) {
return new Observable.Transformer<I, O>() {
@Override
public Observable<O> call(final Observable<I> upstream) {
return Observable.create(
new Action1<Emitter<O>>() {
@Override
public void call(final Emitter<O> emitter) {
Consumer<O> output =
new Consumer<O>() {
@Override
public void accept(O value) {
emitter.onNext(value);
}
};
final Connection<I> input = connectable.connect(output);
final Subscription subscription =
upstream.subscribe(
new Action1<I>() {
@Override
public void call(I f) {
input.accept(f);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
emitter.onError(throwable);
}
},
new Action0() {
@Override
public void call() {
emitter.onCompleted();
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
subscription.unsubscribe();
input.dispose();
}
});
}
},
backpressureMode);
}
};
}
@Test
public void setsCancellation() {
verify(mockEmitter).setCancellation(any(Cancellable.class));
}