下面列出了怎么用io.reactivex.disposables.Disposables的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public <T> Observable<T> queue(final Operation<T> operation) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> tEmitter) {
final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, tEmitter);
tEmitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() {
if (queue.remove(entry)) {
logOperationRemoved(operation);
}
}
}));
logOperationQueued(operation);
queue.add(entry);
}
});
}
/**
* @param emitter
*/
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
@Override
public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
if (!emitter.isDisposed()) {
emitter.onNext(firebaseAuth);
}
}
};
instance.addAuthStateListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
instance.removeAuthStateListener(listener);
}
}));
}
public static boolean checkMainThread(Observer<?> observer) {
if (Looper.myLooper() != Looper.getMainLooper()) {
observer.onSubscribe(Disposables.empty());
observer.onError(new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread().getName()));
return false;
}
return true;
}
static boolean checkMainThread(Observer<?> observer) {
if (Looper.myLooper() != Looper.getMainLooper()) {
observer.onSubscribe(Disposables.empty());
observer.onError(new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread().getName()));
return false;
}
return true;
}
@Override
public void cancel() {
if (disposable.compareAndSet(null, Disposables.disposed())) {
return;
} else {
disposable.get().dispose();
// clear for GC
disposable.set(Disposables.disposed());
}
}
@Override
public void onSubscribe(Disposable d) {
if (!disposable.compareAndSet(null, d)) {
// already cancelled
d.dispose();
disposable.set(Disposables.disposed());
}
}
@Override
protected void subscribeActual(Observer<? super RxBleClient.State> observer) {
if (!rxBleAdapterWrapper.hasBluetoothAdapter()) {
observer.onSubscribe(Disposables.empty());
observer.onComplete();
return;
}
checkPermissionUntilGranted(locationServicesStatus, timerScheduler)
.flatMapObservable(new Function<Boolean, Observable<RxBleClient.State>>() {
@Override
public Observable<RxBleClient.State> apply(Boolean permissionWasInitiallyGranted) {
Observable<RxBleClient.State> stateObservable = checkAdapterAndServicesState(
rxBleAdapterWrapper,
bleAdapterStateObservable,
locationServicesOkObservable
)
.distinctUntilChanged();
return permissionWasInitiallyGranted
/*
* If permission was granted from the beginning then the first value is not a change. The above Observable
* does emit value at the moment of subscription.
*/
? stateObservable.skip(1)
: stateObservable;
}
})
.subscribe(observer);
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
observer.onSubscribe(Disposables.empty());
this.observer = observer;
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observers.add(observer);
observer.onSubscribe(Disposables.fromRunnable(() -> System.out.println("Disposed")));
}