下面列出了怎么用io.reactivex.functions.Cancellable的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(final ObservableEmitter<E> emitter) throws Exception {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
disposable.dispose();
}
});
}
});
}
/**
* Using the returned Observable, you can be notified about data changes.
* Once a transaction is committed, you will get info on classes with changed Objects.
*/
public static <T> Observable<Class> observable(final BoxStore boxStore) {
return Observable.create(new ObservableOnSubscribe<Class>() {
@Override
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
@Override
public void onData(Class data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
for (T datum : data) {
if (emitter.isCancelled()) {
return;
} else {
emitter.onNext(datum);
}
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
/**
* The returned Observable emits Query results as Lists.
* Never completes, so you will get updates when underlying data changes.
*/
public static <T> Observable<List<T>> observable(final Query<T> query) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
@Override
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public synchronized <T> Observable<T> queue(final Operation<T> operation) {
if (!shouldRun) {
return Observable.error(disconnectionException);
}
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) {
final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, emitter);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() {
if (queue.remove(entry)) {
logOperationRemoved(operation);
}
}
});
logOperationQueued(operation);
queue.add(entry);
}
});
}
@Override
final protected void protectedRun(final ObservableEmitter<SCAN_RESULT_TYPE> emitter, QueueReleaseInterface queueReleaseInterface) {
final SCAN_CALLBACK_TYPE scanCallback = createScanCallback(emitter);
try {
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() {
RxBleLog.i("Scan operation is requested to stop.");
stopScan(rxBleAdapterWrapper, scanCallback);
}
});
RxBleLog.i("Scan operation is requested to start.");
boolean startLeScanStatus = startScan(rxBleAdapterWrapper, scanCallback);
if (!startLeScanStatus) {
emitter.tryOnError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START));
}
} catch (Throwable throwable) {
RxBleLog.w(throwable, "Error while calling the start scan function");
emitter.tryOnError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START, throwable));
} finally {
queueReleaseInterface.release();
}
}
public Observable<Boolean> get() {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(final ObservableEmitter<Boolean> emitter) {
final boolean initialValue = locationServicesStatus.isLocationProviderOk();
final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
final boolean newValue = locationServicesStatus.isLocationProviderOk();
emitter.onNext(newValue);
}
};
emitter.onNext(initialValue);
context.registerReceiver(broadcastReceiver, new IntentFilter(LocationManager.MODE_CHANGED_ACTION));
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() {
context.unregisterReceiver(broadcastReceiver);
}
});
}
})
.distinctUntilChanged()
.subscribeOn(Schedulers.trampoline())
.unsubscribeOn(Schedulers.trampoline());
}
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
final AppStateListener appStateListener = new AppStateListener() {
@Override
public void onAppDidEnterForeground() {
appStateEmitter.onNext(FOREGROUND);
}
@Override
public void onAppDidEnterBackground() {
appStateEmitter.onNext(BACKGROUND);
}
};
appStateEmitter.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
recognizer.removeListener(appStateListener);
recognizer.stop();
}
});
recognizer.addListener(appStateListener);
recognizer.start();
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
@Override
@RequiresPermission(USE_FINGERPRINT)
@RequiresApi(Build.VERSION_CODES.M)
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
if (fingerprintApiWrapper.isUnavailable()) {
emitter.onError(new FingerprintUnavailableException("Fingerprint authentication is not available on this device! Ensure that the device has a Fingerprint sensor and enrolled Fingerprints by calling RxFingerprint#isAvailable(Context) first"));
return;
}
AuthenticationCallback callback = createAuthenticationCallback(emitter);
cancellationSignal = fingerprintApiWrapper.createCancellationSignal();
CryptoObject cryptoObject = initCryptoObject(emitter);
//noinspection MissingPermission
fingerprintApiWrapper.getFingerprintManager().authenticate(cryptoObject, cancellationSignal, 0, callback, null);
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
if (cancellationSignal != null && !cancellationSignal.isCanceled()) {
cancellationSignal.cancel();
}
}
});
}
/**
* Converts an ObservableField to an Observable. Note that setting null value inside
* ObservableField (except for initial value) throws a NullPointerException.
* @return Observable that contains the latest value in the ObservableField
*/
@NonNull
public static <T> Observable<T> toObservable(@NonNull final ObservableField<T> field) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(final ObservableEmitter<T> e) throws Exception {
T initialValue = field.get();
if (initialValue != null) {
e.onNext(initialValue);
}
final OnPropertyChangedCallback callback = new OnPropertyChangedCallback() {
@Override
public void onPropertyChanged(android.databinding.Observable observable, int i) {
e.onNext(field.get());
}
};
field.addOnPropertyChangedCallback(callback);
e.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
field.removeOnPropertyChangedCallback(callback);
}
});
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onNext(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addValueEventListener(listener);
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onNext(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addValueEventListener(listener);
}
});
}
private static Observable<Intent> createBroadcastObservable(
final BroadcastRegistrarStrategy broadcastRegistrarStrategy,
final OrderedBroadcastAbortStrategy orderedBroadcastAbortStrategy) {
return Observable.create(new ObservableOnSubscribe<Intent>() {
@Override
public void subscribe(final ObservableEmitter<Intent> intentEmitter) throws Exception {
final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
intentEmitter.onNext(intent);
if (isOrderedBroadcast()) {
orderedBroadcastAbortStrategy.handleOrderedBroadcast(
context,
intent,
BroadcastReceiverAbortProxy.create(this));
}
}
};
intentEmitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
broadcastRegistrarStrategy.unregisterBroadcastReceiver(broadcastReceiver);
}
});
broadcastRegistrarStrategy.registerBroadcastReceiver(broadcastReceiver);
}
});
}
@Override
public ObservableSource<M> apply(final Observable<E> events) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(final ObservableEmitter<M> emitter) throws Exception {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M newModel) {
emitter.onNext(newModel);
}
});
final Disposable eventsDisposable =
events.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Exception {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public ObservableSource<O> apply(final Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(final ObservableEmitter<O> emitter) throws Exception {
Consumer<O> output =
new Consumer<O>() {
@Override
public void accept(O value) {
emitter.onNext(value);
}
};
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new io.reactivex.functions.Consumer<I>() {
@Override
public void accept(I f) {
input.accept(f);
}
},
new io.reactivex.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
emitter.onError(throwable);
}
},
new Action() {
@Override
public void run() {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
@Override
public void setCancellable(Cancellable c) {
throw new UnsupportedOperationException();
}
@Test
public void setsCancellable() {
verify(mockEmitter).setCancellable(any(Cancellable.class));
}
/**
* Create an observable which will notify subscribers with a {@linkplain Query query} for
* execution. Subscribers are responsible for <b>always</b> closing {@link Cursor} instance
* returned from the {@link Query}.
* <p>
* Subscribers will receive an immediate notification for initial data as well as subsequent
* notifications for when the supplied {@code uri}'s data changes. Unsubscribe when you no longer
* want updates to a query.
* <p>
* Since content resolver triggers are inherently asynchronous, items emitted from the returned
* observable use the {@link Scheduler} supplied to {@link SqlBrite#wrapContentProvider}. For
* consistency, the immediate notification sent on subscribe also uses this scheduler. As such,
* calling {@link Observable#subscribeOn subscribeOn} on the returned observable has no effect.
* <p>
* Note: To skip the immediate notification and only receive subsequent notifications when data
* has changed call {@code skip(1)} on the returned observable.
* <p>
* <b>Warning:</b> this method does not perform the query! Only by subscribing to the returned
* {@link Observable} will the operation occur.
*
* @see ContentResolver#query(Uri, String[], String, String[], String)
* @see ContentResolver#registerContentObserver(Uri, boolean, ContentObserver)
*/
@CheckResult @NonNull
public QueryObservable createQuery(@NonNull final Uri uri, @Nullable final String[] projection,
@Nullable final String selection, @Nullable final String[] selectionArgs, @Nullable
final String sortOrder, final boolean notifyForDescendents) {
final Query query = new Query() {
@Override public Cursor run() {
long startNanos = nanoTime();
Cursor cursor = contentResolver.query(uri, projection, selection, selectionArgs, sortOrder);
if (logging) {
long tookMillis = NANOSECONDS.toMillis(nanoTime() - startNanos);
log("QUERY (%sms)\n uri: %s\n projection: %s\n selection: %s\n selectionArgs: %s\n "
+ "sortOrder: %s\n notifyForDescendents: %s", tookMillis, uri,
Arrays.toString(projection), selection, Arrays.toString(selectionArgs), sortOrder,
notifyForDescendents);
}
return cursor;
}
};
Observable<Query> queries = Observable.create(new ObservableOnSubscribe<Query>() {
@Override public void subscribe(final ObservableEmitter<Query> e) throws Exception {
final ContentObserver observer = new ContentObserver(contentObserverHandler) {
@Override public void onChange(boolean selfChange) {
if (!e.isDisposed()) {
e.onNext(query);
}
}
};
contentResolver.registerContentObserver(uri, notifyForDescendents, observer);
e.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
contentResolver.unregisterContentObserver(observer);
}
});
if (!e.isDisposed()) {
e.onNext(query); // Trigger initial query.
}
}
});
return queries //
.observeOn(scheduler) //
.compose(queryTransformer) // Apply the user's query transformer.
.to(QUERY_OBSERVABLE);
}