io.reactivex.internal.disposables.DisposableHelper#dispose ( )源码实例Demo

下面列出了io.reactivex.internal.disposables.DisposableHelper#dispose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
@Override
public void cancel() {
    if (!cancelled) {
        cancelled = true;
        upstream.cancel();
        DisposableHelper.dispose(valueToInsertObserver);
        if (getAndIncrement() == 0) {
            // use the same access control to queue as drain method
            // because `clear` just calls `queue.poll()` repeatedly till nothing left on the
            // queue (ignoring the dequeued items).
            //
            // this is best endeavours, there still exists a race with onNext and drain
            // where items could be left on the queue after cancel
            queue.clear();
        }
    }
}
 
源代码2 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
@Override
public void onError(Throwable e) {
    if (finished) {
        RxJavaPlugins.onError(e);
        return;
    }
    finished = true;
    if (error.compareAndSet(null, e)) {
        DisposableHelper.dispose(valueToInsertObserver);
        done = true;
        drain();
    } else {
        RxJavaPlugins.onError(e);
    }
}
 
源代码3 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
@Override
public void onComplete() {
    if (finished) {
        return;
    }
    finished = true;
    DisposableHelper.dispose(valueToInsertObserver);
    done = true;
    drain();
}
 
源代码4 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
void insertError(Throwable e) {
    if (error.compareAndSet(null, e)) {
        upstream.cancel();
        DisposableHelper.dispose(valueToInsertObserver);
        done = true;
        drain();
    } else {
        RxJavaPlugins.onError(e);
    }
}
 
源代码5 项目: rxjava-RxLife   文件: LifeObserver.java
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}
 
源代码6 项目: rxjava-RxLife   文件: LifeMaybeObserver.java
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}
 
源代码7 项目: rxjava-RxLife   文件: LifeCompletableObserver.java
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}
 
源代码8 项目: rxjava-RxLife   文件: LifeSingleObserver.java
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}
 
源代码9 项目: rxjava2-extras   文件: FlowableInsertTimeout.java
private void dispose() {
    DisposableHelper.dispose(scheduled);
    worker.dispose();
}
 
源代码10 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
private void drain() {
    if (getAndIncrement() != 0) {
        return;
    }
    // note that this drain loop does not shortcut errors
    int missed = 1;
    while (true) {
        long r = requested.get();
        long e = 0;
        while (e != r) {
            if (cancelled) {
                DisposableHelper.dispose(valueToInsertObserver);
                queue.clear();
                return;
            }
            // must read `done` before polling queue
            boolean d = done;
            T t = queue.poll();
            if (t == null) {
                if (d) {
                    Object err = error.get();
                    if (err != null) {
                        // clear the exception so can be gc'd
                        // `this` is not a real error, it just prevents
                        // it getting set again in a race because the other
                        // setters which use CAS assume initial value of null
                        error.set(this);
                        DisposableHelper.dispose(valueToInsertObserver);
                        downstream.onError((Throwable) err);
                    } else {
                        // don't need to dispose valueToInsertObserver because already done in
                        // onComplete
                        downstream.onComplete();
                    }
                    return;
                } else {
                    // nothing to emit and not done
                    break;
                }
            } else {
                downstream.onNext(t);
                e++;
            }
        }
        if (e != 0L && r != Long.MAX_VALUE) {
            requested.addAndGet(-e);
        }
        missed = addAndGet(-missed);
        if (missed == 0) {
            return;
        }
    }
}
 
源代码11 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
@Override
public void dispose() {
    DisposableHelper.dispose(this);
}
 
源代码12 项目: resilience4j   文件: AbstractDisposable.java
@Override
public void dispose() {
    if (DisposableHelper.dispose(subscription)) {
        hookOnCancel();
    }
}
 
源代码13 项目: resilience4j   文件: AbstractDisposable.java
void whenNotCompleted(Runnable runnable) {
    if (DisposableHelper.dispose(subscription)) {
        runnable.run();
    }
}