类io.reactivex.internal.subscriptions.SubscriptionHelper源码实例Demo

下面列出了怎么用io.reactivex.internal.subscriptions.SubscriptionHelper的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-extras   文件: FlowableInsertTimeout.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        // modify request to upstream to account for inserted values
        // use a CAS loop because request can be called from any thread
        while (true) {
            long ins = inserted.get();
            long d = Math.min(ins, n);
            if (inserted.compareAndSet(ins, ins - d)) {
                if (n - d > 0) {
                    upstream.request(n - d);
                }
                break;
            }
        }
        drain();
    }
}
 
源代码2 项目: rxjava2-extras   文件: FlowableRepeat.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        if (BackpressureHelper.add(this, n) == 0) {
            long requested = n;
            long emitted = 0;
            do {
                emitted = requested;
                while (requested-- > 0 && !cancelled && (count == -1 || counter-- > 0)) {
                    child.onNext(value);
                }
            } while ((requested = this.addAndGet(-emitted)) > 0);
            if (count >= 0 && !cancelled) {
                child.onComplete();
            }
        }
    }
}
 
@Override
protected void subscribeActual(Subscriber<? super T> child) {

    Flowable<T> f;
    try {
        f = transform.apply(source);
    } catch (Exception e) {
        Exceptions.throwIfFatal(e);
        child.onSubscribe(SubscriptionHelper.CANCELLED);
        child.onError(e);
        return;
    }
    AtomicReference<Chain<T>> chainRef = new AtomicReference<Chain<T>>();
    DestinationSerializedSubject<T> destination = new DestinationSerializedSubject<T>(child,
            chainRef);
    Chain<T> chain = new Chain<T>(transform, destination, maxIterations, maxChained, tester);
    chainRef.set(chain);
    // destination is not initially subscribed to the chain but will be when
    // tester function result completes
    destination.subscribe(child);
    ChainedReplaySubject<T> sub = ChainedReplaySubject.create(destination, chain, tester);
    chain.initialize(sub);
    f.onTerminateDetach() //
            .subscribe(sub);
}
 
@Override
public void request(long n) {
    debug(this + " request " + n);
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        while (true) {
            Subscription p = parent.get();
            long d = deferredRequests.get();
            if (d == -1) {
                // parent exists so can request of it
                debug(this + " requesting from parent " + n);
                p.request(n);
                break;
            } else {
                long d2 = d + n;
                if (d2 < 0) {
                    d2 = Long.MAX_VALUE;
                }
                if (deferredRequests.compareAndSet(d, d2)) {
                    break;
                }
            }
        }
        drain();
    }
}
 
源代码5 项目: rxjava2-extras   文件: FlowableInsertMaybe.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        // modify request to upstream to account for inserted values
        // use a CAS loop because request can be called from any thread
        while (true) {
            long ins = inserted.get();
            long d = Math.min(ins, n);
            if (inserted.compareAndSet(ins, ins - d)) {
                if (n - d > 0) {
                    upstream.request(n - d);
                }
                break;
            }
        }
        drain();
    }
}
 
源代码6 项目: rxjava-RxLife   文件: LifeSubscriber.java
@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {
        try {
            addObserver();
            downstream.onSubscribe(s);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }
    }
}
 
源代码7 项目: rxjava-RxLife   文件: LifeSubscriber.java
@Override
public void onError(Throwable t) {
    if (isDisposed()) {
        RxJavaPlugins.onError(t);
        return;
    }
    lazySet(SubscriptionHelper.CANCELLED);
    try {
        removeObserver();
        downstream.onError(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(new CompositeException(t, e));
    }
}
 
源代码8 项目: rxjava-RxLife   文件: LifeSubscriber.java
@Override
public void onComplete() {
    if (isDisposed()) return;
    lazySet(SubscriptionHelper.CANCELLED);
    try {
        removeObserver();
        downstream.onComplete();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
    }
}
 
源代码9 项目: rxjava-RxLife   文件: LifeConditionalSubscriber.java
@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {
        try {
            addObserver();
            downstream.onSubscribe(s);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }
    }
}
 
源代码10 项目: rxjava-RxLife   文件: LifeConditionalSubscriber.java
@Override
public void onError(Throwable t) {
    if (isDisposed()) {
        RxJavaPlugins.onError(t);
        return;
    }
    lazySet(SubscriptionHelper.CANCELLED);
    try {
        removeObserver();
        downstream.onError(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(new CompositeException(t, e));
    }
}
 
源代码11 项目: rxjava-RxLife   文件: LifeConditionalSubscriber.java
@Override
public void onComplete() {
    if (isDisposed()) return;
    lazySet(SubscriptionHelper.CANCELLED);
    try {
        removeObserver();
        downstream.onComplete();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
    }
}
 
源代码12 项目: RxBus   文件: MyLambdaSubscriber.java
@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {
        try {
            onSubscribe.accept(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }
    }
}
 
源代码13 项目: RxBus   文件: MyLambdaSubscriber.java
@Override
    public void onError(Throwable t) {
        if (get() != SubscriptionHelper.CANCELLED) {
//            lazySet(SubscriptionHelper.CANCELLED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }
 
源代码14 项目: RxBus   文件: MyLambdaSubscriber.java
@Override
public void onComplete() {
    if (get() != SubscriptionHelper.CANCELLED) {
        lazySet(SubscriptionHelper.CANCELLED);
        try {
            onComplete.run();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
        }
    }
}
 
@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.validate(this.subscription, s)) {
        this.subscription = s;

        downstream.onSubscribe(this);

        s.request(Long.MAX_VALUE);
    }
}
 
@Override
public void request(final int n) {
    if (SubscriptionHelper.validate(n)) {
        requestsQueue.add(n);
        if (add(this, n) == 0L) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    slowPath(n);
                }
            });
        }
    }
}
 
源代码17 项目: RxShell   文件: Harvester.java
@Override
public void onSubscribe(Subscription subscription) {
    if (SubscriptionHelper.validate(this.subscription, subscription)) {
        this.subscription = subscription;
        customer.onSubscribe(this);
    }
}
 
源代码18 项目: rxjava2-extras   文件: FlowableStringSplitSimple.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(this, n);
        if (once.compareAndSet(false, true)) {
            if (n == Long.MAX_VALUE) {
                parent.request(Long.MAX_VALUE);
                unbounded = true;
            } else {
                parent.request(1);
            }
        }
        drain();
    }
}
 
源代码19 项目: rxjava2-extras   文件: FlowableMinRequest.java
@Override
public void onSubscribe(Subscription parent) {
    if (SubscriptionHelper.validate(this.parent, parent)) {
        this.parent = parent;
        child.onSubscribe(this);
    }
}
 
源代码20 项目: rxjava2-extras   文件: FlowableMinRequest.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        drain();
    }
}
 
源代码21 项目: rxjava2-extras   文件: FlowableInsertTimeout.java
@Override
public void onSubscribe(Subscription upstream) {
    if (SubscriptionHelper.validate(this.upstream, upstream)) {
        this.upstream = upstream;
        downstream.onSubscribe(this);
    }
}
 
源代码22 项目: rxjava2-extras   文件: FlowableMaxRequest.java
@Override
public void onSubscribe(Subscription parent) {
    if (SubscriptionHelper.validate(this.parent, parent)) {
        this.parent = parent;
        child.onSubscribe(this);
    }
}
 
源代码23 项目: rxjava2-extras   文件: FlowableMaxRequest.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        requestMore();
    }
}
 
源代码24 项目: rxjava2-extras   文件: FlowableDoOnEmpty.java
@Override
public void onSubscribe(Subscription parent) {
    if (SubscriptionHelper.validate(this.parent, parent)) {
        this.parent = parent;
        child.onSubscribe(this);
    }
}
 
源代码25 项目: rxjava2-extras   文件: FlowableMapLast.java
@Override
public void onSubscribe(Subscription subscription) {
    if (SubscriptionHelper.validate(this.parent, subscription)) {
        this.parent = subscription;
        actual.onSubscribe(this);
    }
}
 
源代码26 项目: rxjava2-extras   文件: FlowableMapLast.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        if (firstRequest.compareAndSet(true, false)) {
            long m = n + 1;
            if (m < 0) {
                m = Long.MAX_VALUE;
            }
            parent.request(m);
        } else {
            parent.request(n);
        }
    }
}
 
源代码27 项目: rxjava2-extras   文件: FlowableWindowMinMax.java
@Override
public void onSubscribe(Subscription parent) {
    if (SubscriptionHelper.validate(this.parent, parent)) {
        this.parent = parent;
        child.onSubscribe(this);
        parent.request(windowSize - 1);
    }
}
 
源代码28 项目: rxjava2-extras   文件: FlowableMatch.java
@Override
public void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        BackpressureHelper.add(requested, n);
        drain();
    }
}
 
源代码29 项目: rxjava2-extras   文件: FlowableStringInputStream.java
byte[] awaitBufferIfNecessary() throws IOException {
    byte[] a = bytes;
    if (a == null) {
        synchronized (this) {
            for (;;) {
                boolean d = done;
                a = bytes;
                if (a != null) {
                    break;
                }
                if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
                    break;
                }
                try {
                    wait();
                } catch (InterruptedException ex) {
                    if (upstream.get() != SubscriptionHelper.CANCELLED) {
                        InterruptedIOException exc = new InterruptedIOException();
                        exc.initCause(ex);
                        throw exc;
                    }
                    break;
                }
            } 
        }
    }
    return a;
}
 
源代码30 项目: rxjava2-extras   文件: FlowableStringInputStream.java
@Override
public void close() throws IOException {
    SubscriptionHelper.cancel(upstream);
    synchronized (this) {
        notifyAll();
    }
}
 
 类所在包
 同包方法