java.util.concurrent.Flow#Subscriber ( )源码实例Demo

下面列出了java.util.concurrent.Flow#Subscriber ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: pgadba   文件: PgRowPublisherOperation.java
@Override
public ParameterizedRowPublisherOperation<R> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
    CompletionStage<? extends R> result) {
  if (result == null) {
    throw new IllegalArgumentException("result is not allowed to be null");
  }
  if (subscriber == null) {
    throw new IllegalArgumentException("subscriber is not allowed to be null");
  }

  publisher.subscribe(subscriber);
  this.result = result;
  result.thenAccept(r -> {
    if (groupSubmission != null) {
      groupSubmission.addGroupResult(r);
    }
    submission.getCompletionStage().toCompletableFuture().complete(r);
  });
  return this;
}
 
源代码2 项目: Fibry   文件: TestReactiveSubscribers.java
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
    subscriber.onSubscribe(new Flow.Subscription() {
        private final AtomicBoolean completed = new AtomicBoolean(false);
        private final AtomicInteger numMessagesToSend = new AtomicInteger();
        private final Actor<Flow.Subscriber<? super Integer>, Void, Void> actorRefill = ActorSystem.anonymous().newActor(sub -> {
            while (numSent.get() < numMax && numMessagesToSend.get() > 0) {
                subscriber.onNext(numSent.incrementAndGet());
                numMessagesToSend.decrementAndGet();
            }

            if (numSent.get() >= numMax) {
                if (completed.compareAndSet(false, true))
                    subscriber.onComplete();
            }
        });

        @Override
        public void request(long n) {
            if (numSent.get() >= numMax)
                return;

            numMessagesToSend.accumulateAndGet((int) n, Math::max);

            actorRefill.sendMessage(subscriber);
        }

        @Override
        public void cancel() {
            numSent.set(numMax);
        }
    });
}
 
源代码3 项目: openjdk-jdk9   文件: RequestProcessors.java
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    delegate.subscribe(subscriber);
}
 
源代码4 项目: openjdk-jdk9   文件: PseudoPublisher.java
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
    subscriber.onSubscribe(new Subscription(subscriber));
}
 
源代码5 项目: Fibry   文件: TestReactiveSubscribersWhiteBox.java
@Override
public Flow.Subscriber<Integer> createFlowSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
    var realSubscriber = ActorSystem.anonymous().newActor((Integer n) -> {
    }).asReactiveSubscriber(100, null, null);

    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            realSubscriber.onSubscribe(subscription);

            // register a successful Subscription, and create a Puppet,
            // for the WhiteboxVerification to be able to drive its tests:
            probe.registerOnSubscribe(new SubscriberPuppet() {
                @Override
                public void triggerRequest(long elements) {
                    subscription.request(elements);
                }

                @Override
                public void signalCancel() {
                    subscription.cancel();
                }
            });
        }

        @Override
        public void onNext(Integer item) {
            realSubscriber.onNext(item);
            probe.registerOnNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            realSubscriber.onError(throwable);
            probe.registerOnError(throwable);
        }

        @Override
        public void onComplete() {
            realSubscriber.onComplete();
            probe.registerOnComplete();
        }
    };
}
 
源代码6 项目: openjdk-jdk9   文件: ShortRequestBody.java
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    delegate.subscribe(subscriber);
}
 
源代码7 项目: openjdk-jdk9   文件: RequestProcessors.java
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    Iterable<ByteBuffer> iterable = this::iterator;
    this.delegate = new PullPublisher<>(iterable);
    delegate.subscribe(subscriber);
}
 
源代码8 项目: java-async-util   文件: FlowAdapter.java
IteratorBackedSubscription(final AsyncIterator<T> iterator,
    final Flow.Subscriber<? super T> subscriber) {
  this.iterator = Objects.requireNonNull(iterator);
  this.subscriber = Objects.requireNonNull(subscriber);
}
 
源代码9 项目: openjdk-jdk9   文件: PushPublisher.java
Subscription(Flow.Subscriber<? super T> subscriber) {
    PushPublisher.this.subscriber = subscriber;
}
 
源代码10 项目: reactor-core   文件: JdkFlowAdapter.java
public FlowSubscriber(Flow.Subscriber<? super T> subscriber) {
	this.subscriber = subscriber;
}
 
public DemoSubscription(Flow.Subscriber subscriber,
                        ExecutorService executor) {
    this.subscriber = subscriber;
    this.executor = executor;
}
 
源代码12 项目: Java-9-Cookbook   文件: Subscription.java
public Subscription(Flow.Subscriber subscriber,
                    ExecutorService executor) {
    this.subscriber = subscriber;
    this.executor = executor;
}
 
源代码13 项目: openjdk-jdk9   文件: PushPublisher.java
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
    subscription = new Subscription(subscriber);
    subscriber.onSubscribe(subscription);
}
 
源代码14 项目: openjdk-jdk9   文件: RequestProcessors.java
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    List<ByteBuffer> copy = copy(content, offset, length);
    this.delegate = new PullPublisher<>(copy);
    delegate.subscribe(subscriber);
}
 
源代码15 项目: openjdk-jdk9   文件: BlockingPushPublisher.java
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
    state = SubscriptionState.OPENED;
    subscription = new Subscription(subscriber);
    subscriber.onSubscribe(subscription);
}
 
源代码16 项目: openjdk-jdk9   文件: BlockingPushPublisher.java
Subscription(Flow.Subscriber<? super T> subscriber) {
    BlockingPushPublisher.this.subscriber = subscriber;
}
 
源代码17 项目: openjdk-jdk9   文件: PullPublisher.java
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
    subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
}
 
源代码18 项目: openjdk-jdk9   文件: PullPublisher.java
Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
    this.subscriber = subscriber;
    this.iter = iter;
}
 
源代码19 项目: pgadba   文件: RowPublisherOperation.java
/**
 * Subscribe to the stream of rows returned by this {@link Operation}. The
 *  value of the {@code result} parameter is the result of this {@link Operation}.
 *
 * @param subscriber Not null.
 * @param result Not null.
 * @return this RowPublisherOperation
 */
public RowPublisherOperation<T> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
                                          CompletionStage<? extends T> result);
 
/**
 * {@inheritDoc}
 * 
 * @return this {@code ParameterizedRowPublisherOperation}
 */
@Override
public ParameterizedRowPublisherOperation<T> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
                                                        CompletionStage<? extends T> result);