下面列出了java.util.concurrent.Flow#Subscription ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void onSubscribe(Flow.Subscription subscription) {
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
(this.subscription = subscription).request(bufferSize);
System.out.println("开始onSubscribe订阅");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void onSubscribe(Flow.Subscription subscription) {
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
(this.subscription = subscription).request(bufferSize);
System.out.println("开始onSubscribe订阅");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
synchronized (this) {
closed = true;
Flow.Subscription s = subscription;
if (s != null) {
s.cancel();
}
subscription = null;
}
super.close();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
subscription.request(1);
}
Flow.Subscriber<T> asReactiveSubscriber(int optimalQueueLength, Consumer<Throwable> onErrorHandler, Consumer<PartialActor<T, S>> onCompleteHandler) {
AtomicReference<Flow.Subscription> sub = new AtomicReference<>();
return new Flow.Subscriber<T>() {
private void askRefill() {
int messagesRequired = optimalQueueLength - queue.size();
if (messagesRequired > 0)
sub.get().request(messagesRequired);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (sub.get() != null)
subscription.cancel();
else {
sub.set(subscription);
askRefill();
}
}
@Override
public void onNext(T item) {
Objects.requireNonNull(item);
execAsync(() -> {
actorLogic.accept(item);
askRefill();
});
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
execAsync(() -> {
if (onErrorHandler != null)
onErrorHandler.accept(throwable);
});
sendPoisonPill();
}
@Override
public void onComplete() {
execAsync(() -> {
if (onCompleteHandler != null)
onCompleteHandler.accept(Actor.this);
});
sendPoisonPill();
}
};
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("onSubscribe was called");
subscription.request(3);
this.subscription = subscription;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(INFINITE);
}
public Flow.Subscription getSubscription() {
return subscription;
}
@Override
public void onSubscribe(Flow.Subscription s) {
this.subscription = s;
s.request(Math.max(2, buffers.remainingCapacity() + 1));
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(10);
demand += 10;
}
@Override
public void subscribe(Flow.Subscriber<? super FlowData> subscriber) {
subscribers.add(subscriber);
Flow.Subscription subscription = new SubscriptionImpl(this);
subscriber.onSubscribe(subscription);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstreamSubscription = subscription;
upstreamSubscription.request(1);
}
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(0);
}
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(0);
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
s.onSubscribe(this);
}
public Flow.Subscription getSubscription() {
return subscription;
}