下面列出了java.util.concurrent.Flow#Subscriber ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public ParameterizedRowPublisherOperation<R> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
CompletionStage<? extends R> result) {
if (result == null) {
throw new IllegalArgumentException("result is not allowed to be null");
}
if (subscriber == null) {
throw new IllegalArgumentException("subscriber is not allowed to be null");
}
publisher.subscribe(subscriber);
this.result = result;
result.thenAccept(r -> {
if (groupSubmission != null) {
groupSubmission.addGroupResult(r);
}
submission.getCompletionStage().toCompletableFuture().complete(r);
});
return this;
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicInteger numMessagesToSend = new AtomicInteger();
private final Actor<Flow.Subscriber<? super Integer>, Void, Void> actorRefill = ActorSystem.anonymous().newActor(sub -> {
while (numSent.get() < numMax && numMessagesToSend.get() > 0) {
subscriber.onNext(numSent.incrementAndGet());
numMessagesToSend.decrementAndGet();
}
if (numSent.get() >= numMax) {
if (completed.compareAndSet(false, true))
subscriber.onComplete();
}
});
@Override
public void request(long n) {
if (numSent.get() >= numMax)
return;
numMessagesToSend.accumulateAndGet((int) n, Math::max);
actorRefill.sendMessage(subscriber);
}
@Override
public void cancel() {
numSent.set(numMax);
}
});
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
delegate.subscribe(subscriber);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new Subscription(subscriber));
}
@Override
public Flow.Subscriber<Integer> createFlowSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
var realSubscriber = ActorSystem.anonymous().newActor((Integer n) -> {
}).asReactiveSubscriber(100, null, null);
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
realSubscriber.onSubscribe(subscription);
// register a successful Subscription, and create a Puppet,
// for the WhiteboxVerification to be able to drive its tests:
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
subscription.request(elements);
}
@Override
public void signalCancel() {
subscription.cancel();
}
});
}
@Override
public void onNext(Integer item) {
realSubscriber.onNext(item);
probe.registerOnNext(item);
}
@Override
public void onError(Throwable throwable) {
realSubscriber.onError(throwable);
probe.registerOnError(throwable);
}
@Override
public void onComplete() {
realSubscriber.onComplete();
probe.registerOnComplete();
}
};
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
delegate.subscribe(subscriber);
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Iterable<ByteBuffer> iterable = this::iterator;
this.delegate = new PullPublisher<>(iterable);
delegate.subscribe(subscriber);
}
IteratorBackedSubscription(final AsyncIterator<T> iterator,
final Flow.Subscriber<? super T> subscriber) {
this.iterator = Objects.requireNonNull(iterator);
this.subscriber = Objects.requireNonNull(subscriber);
}
Subscription(Flow.Subscriber<? super T> subscriber) {
PushPublisher.this.subscriber = subscriber;
}
public FlowSubscriber(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
public DemoSubscription(Flow.Subscriber subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public Subscription(Flow.Subscriber subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
subscription = new Subscription(subscriber);
subscriber.onSubscribe(subscription);
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
List<ByteBuffer> copy = copy(content, offset, length);
this.delegate = new PullPublisher<>(copy);
delegate.subscribe(subscriber);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
state = SubscriptionState.OPENED;
subscription = new Subscription(subscriber);
subscriber.onSubscribe(subscription);
}
Subscription(Flow.Subscriber<? super T> subscriber) {
BlockingPushPublisher.this.subscriber = subscriber;
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
}
Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
this.subscriber = subscriber;
this.iter = iter;
}
/**
* Subscribe to the stream of rows returned by this {@link Operation}. The
* value of the {@code result} parameter is the result of this {@link Operation}.
*
* @param subscriber Not null.
* @param result Not null.
* @return this RowPublisherOperation
*/
public RowPublisherOperation<T> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
CompletionStage<? extends T> result);
/**
* {@inheritDoc}
*
* @return this {@code ParameterizedRowPublisherOperation}
*/
@Override
public ParameterizedRowPublisherOperation<T> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
CompletionStage<? extends T> result);