下面列出了怎么用io.reactivex.internal.disposables.DisposableHelper的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void onNext(final T t) {
if (finished) {
return;
}
queue.offer(t);
final long waitTime;
try {
waitTime = timeout.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// we cancel upstream ourselves because the
// error did not originate from source
upstream.cancel();
onError(e);
return;
}
TimeoutAction<T> action = new TimeoutAction<T>(this, t);
Disposable d = worker.schedule(action, waitTime, unit);
DisposableHelper.set(scheduled, d);
drain();
}
@Override
public void onNext(T t) {
if (finished) {
return;
}
queue.offer(t);
Maybe<? extends T> maybe;
try {
maybe = valueToInsert.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// we cancel upstream ourselves because the
// error did not originate from source
upstream.cancel();
onError(e);
return;
}
ValueToInsertObserver<T> o = new ValueToInsertObserver<T>(this);
if (DisposableHelper.set(valueToInsertObserver, o)) {
// note that at this point we have to cover o being disposed
// from another thread so the Observer class needs
// to handle dispose being called before/during onSubscribe
maybe.subscribe(o);
}
drain();
}
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
upstream.cancel();
DisposableHelper.dispose(valueToInsertObserver);
if (getAndIncrement() == 0) {
// use the same access control to queue as drain method
// because `clear` just calls `queue.poll()` repeatedly till nothing left on the
// queue (ignoring the dequeued items).
//
// this is best endeavours, there still exists a race with onNext and drain
// where items could be left on the queue after cancel
queue.clear();
}
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
addObserver();
downstream.onSubscribe(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onComplete() {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onComplete();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
addObserver();
downstream.onSubscribe(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
@Override
public void onSuccess(T t) {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onSuccess(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onComplete() {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onComplete();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
addObserver();
downstream.onSubscribe(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Override
public void onComplete() {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onComplete();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
addObserver();
downstream.onSubscribe(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
@Override
public void onSuccess(T t) {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onSuccess(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}
@Override
public void onError(Throwable e) {
if (finished) {
RxJavaPlugins.onError(e);
return;
}
finished = true;
if (error.compareAndSet(null, e)) {
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onComplete() {
if (finished) {
return;
}
finished = true;
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
}
void insertError(Throwable e) {
if (error.compareAndSet(null, e)) {
upstream.cancel();
DisposableHelper.dispose(valueToInsertObserver);
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onSubscribe(Disposable upstream) {
// an AtomicReference is used to hold the upstream Disposable
// because this Observer can be disposed before onSubscribe
// is called (contrary to the normal contract).
DisposableHelper.setOnce(this, upstream);
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable d) {
if (!DisposableHelper.validate(disposable, d)) {
return;
}
disposable = d;
try (SafeCloseable ignored = assemblyContext.push()) {
actual.onSubscribe(this);
}
}
@Override
public void onSubscribe(Disposable disposable) {
Objects.requireNonNull(disposable, "disposable");
if (!setDisposable(disposable)) {
disposable.dispose();
DisposableHelper.reportDisposableSet();
return;
}
writeStream.exceptionHandler(t -> {
if (!setDone()) {
RxJavaPlugins.onError(t);
return;
}
getDisposable().dispose();
Consumer<? super Throwable> c;
synchronized (this) {
c = this.writeStreamExceptionHandler;
}
if (c != null) {
try {
c.accept(t);
} catch (Exception e) {
RxJavaPlugins.onError(t);
}
}
});
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}