Subscription ( )源码实例Demo

java.util.concurrent.Flow#Subscription ( )源码实例Demo

下面列出了java.util.concurrent.Flow#Subscription ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public void onSubscribe(Flow.Subscription subscription) {
    //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
    (this.subscription = subscription).request(bufferSize);
    System.out.println("开始onSubscribe订阅");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
public void onSubscribe(Flow.Subscription subscription) {
    //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
    (this.subscription = subscription).request(bufferSize);
    System.out.println("开始onSubscribe订阅");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
源代码3 项目: openjdk-jdk9   文件: HttpInputStreamTest.java
@Override
public void close() throws IOException {
    synchronized (this) {
        closed = true;
        Flow.Subscription s = subscription;
        if (s != null) {
            s.cancel();
        }
        subscription = null;
    }
    super.close();
}
 
源代码4 项目: openjdk-jdk9   文件: Http1Request.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException("already subscribed");
    }
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码5 项目: Fibry   文件: Actor.java
Flow.Subscriber<T> asReactiveSubscriber(int optimalQueueLength, Consumer<Throwable> onErrorHandler, Consumer<PartialActor<T, S>> onCompleteHandler) {
    AtomicReference<Flow.Subscription> sub = new AtomicReference<>();
    return new Flow.Subscriber<T>() {
        private void askRefill() {
            int messagesRequired = optimalQueueLength - queue.size();

            if (messagesRequired > 0)
                sub.get().request(messagesRequired);
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (sub.get() != null)
                subscription.cancel();
            else {
                sub.set(subscription);
                askRefill();
            }
        }

        @Override
        public void onNext(T item) {
            Objects.requireNonNull(item);

            execAsync(() -> {
                actorLogic.accept(item);
                askRefill();
            });
        }

        @Override
        public void onError(Throwable throwable) {
            Objects.requireNonNull(throwable);
            execAsync(() -> {
                if (onErrorHandler != null)
                    onErrorHandler.accept(throwable);
            });

            sendPoisonPill();
        }

        @Override
        public void onComplete() {
            execAsync(() -> {
                if (onCompleteHandler != null)
                    onCompleteHandler.accept(Actor.this);
            });
            sendPoisonPill();
        }
    };
}
 
@Override
public void onSubscribe(Flow.Subscription subscription) {
    log.info("onSubscribe was called");
    subscription.request(3);
    this.subscription = subscription;
}
 
@Override
public void onSubscribe(Flow.Subscription subscription) {
	this.subscription = subscription;
	subscription.request(1);
}
 
@Override
public void onSubscribe(Flow.Subscription subscription) {
    subscription.request(INFINITE);
}
 
public Flow.Subscription getSubscription() {
    return subscription;
}
 
源代码10 项目: openjdk-jdk9   文件: HttpInputStreamTest.java
@Override
public void onSubscribe(Flow.Subscription s) {
    this.subscription = s;
    s.request(Math.max(2, buffers.remainingCapacity() + 1));
}
 
源代码11 项目: openjdk-jdk9   文件: ResponseProcessors.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(Long.MAX_VALUE);
}
 
源代码12 项目: pgadba   文件: SimpleRowSubscriber.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
  this.subscription = subscription;
  this.subscription.request(10);
  demand += 10;
}
 
@Override
public void subscribe(Flow.Subscriber<? super FlowData> subscriber) {
    subscribers.add(subscriber);
    Flow.Subscription subscription = new SubscriptionImpl(this);
    subscriber.onSubscribe(subscription);
}
 
源代码14 项目: tutorials   文件: FlowApiLiveVideo.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码15 项目: openjdk-jdk9   文件: ResponseProcessors.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码16 项目: mycore   文件: MCRCommandListProcessor.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.upstreamSubscription = subscription;
    upstreamSubscription.request(1);
}
 
源代码17 项目: Java-9-Cookbook   文件: Processor.java
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(0);
}
 
源代码18 项目: Java-9-Cookbook   文件: DemoSubscriber.java
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(0);
}
 
源代码19 项目: reactor-core   文件: JdkFlowAdapter.java
@Override
public void onSubscribe(final Flow.Subscription subscription) {
    this.subscription = subscription;
	s.onSubscribe(this);
}
 
public Flow.Subscription getSubscription() {
    return subscription;
}
 
如果文章对您有帮助,请点击页面广告。以鼓励作者做出的努力。
 方法所在类