下面列出了io.reactivex.internal.disposables.DisposableHelper#dispose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
upstream.cancel();
DisposableHelper.dispose(valueToInsertObserver);
if (getAndIncrement() == 0) {
// use the same access control to queue as drain method
// because `clear` just calls `queue.poll()` repeatedly till nothing left on the
// queue (ignoring the dequeued items).
//
// this is best endeavours, there still exists a race with onNext and drain
// where items could be left on the queue after cancel
queue.clear();
}
}
}
@Override
public void onError(Throwable e) {
if (finished) {
RxJavaPlugins.onError(e);
return;
}
finished = true;
if (error.compareAndSet(null, e)) {
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onComplete() {
if (finished) {
return;
}
finished = true;
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
}
void insertError(Throwable e) {
if (error.compareAndSet(null, e)) {
upstream.cancel();
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
private void dispose() {
DisposableHelper.dispose(scheduled);
worker.dispose();
}
private void drain() {
if (getAndIncrement() != 0) {
return;
}
// note that this drain loop does not shortcut errors
int missed = 1;
while (true) {
long r = requested.get();
long e = 0;
while (e != r) {
if (cancelled) {
DisposableHelper.dispose(valueToInsertObserver);
queue.clear();
return;
}
// must read `done` before polling queue
boolean d = done;
T t = queue.poll();
if (t == null) {
if (d) {
Object err = error.get();
if (err != null) {
// clear the exception so can be gc'd
// `this` is not a real error, it just prevents
// it getting set again in a race because the other
// setters which use CAS assume initial value of null
error.set(this);
DisposableHelper.dispose(valueToInsertObserver);
downstream.onError((Throwable) err);
} else {
// don't need to dispose valueToInsertObserver because already done in
// onComplete
downstream.onComplete();
}
return;
} else {
// nothing to emit and not done
break;
}
} else {
downstream.onNext(t);
e++;
}
}
if (e != 0L && r != Long.MAX_VALUE) {
requested.addAndGet(-e);
}
missed = addAndGet(-missed);
if (missed == 0) {
return;
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public void dispose() {
if (DisposableHelper.dispose(subscription)) {
hookOnCancel();
}
}
void whenNotCompleted(Runnable runnable) {
if (DisposableHelper.dispose(subscription)) {
runnable.run();
}
}