下面列出了怎么用java.util.concurrent.Flow.Subscriber的API类实例代码及写法,或者点击链接到github查看源代码。
private Publisher<Integer> newMockFlowPublisher(
BiConsumer<Subscriber<? super Integer>, Subscription> subscriberTerminator) {
@SuppressWarnings("unchecked")
Publisher<Integer> flowPublisher = mock(Publisher.class);
doAnswer(invocation -> {
Subscriber<? super Integer> subscriber = invocation.getArgument(0);
Subscription subscription = mock(Subscription.class);
doAnswer(invocation1 -> {
subscriberTerminator.accept(subscriber, subscription);
return null;
}).when(subscription).request(anyLong());
subscriber.onSubscribe(subscription);
return null;
}).when(flowPublisher).subscribe(any());
return flowPublisher;
}
/**
* read method
*
* @return return future string of output
*/
public Future<String> read() {
return CompletableFuture.supplyAsync(() -> {
try (Scanner scanner = new Scanner(is)) {
log.debug("Reading stream {}", is);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
data.append(line);
if (appendLineSeparator) {
data.append(System.getProperty("line.separator"));
}
subscribers.forEach(sub -> sub.onNext(line));
}
scanner.close();
return data.toString();
} catch (Exception e) {
subscribers.forEach(sub -> sub.onError(e));
throw new CompletionException(e);
} finally {
subscribers.forEach(Subscriber::onComplete);
}
}, runnable -> new Thread(runnable).start());
}
@Test
public void toFlowCancel() {
TestPublisher<Integer> stPublisher = new TestPublisher<>();
Subscriber<Integer> subscriber = toFlowPublisherAndSubscribe(stPublisher);
TestSubscription subscription = new TestSubscription();
stPublisher.onSubscribe(subscription);
assertThat("Source not subscribed.", stPublisher.isSubscribed(), is(true));
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(subscriptionCaptor.capture());
subscriptionCaptor.getValue().cancel();
assertThat("Subscription not cancelled.", subscription.isCancelled(), is(true));
}
@Test
public void toFlowFromSourceCancel() {
PublisherSource.Subscription srcSubscription = mock(PublisherSource.Subscription.class);
PublisherSource<Integer> source = s -> s.onSubscribe(srcSubscription);
Subscriber<Integer> subscriber = toFlowPublisherFromSourceAndSubscribe(source);
ArgumentCaptor<Subscription> flowSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(flowSubscriptionCaptor.capture());
flowSubscriptionCaptor.getValue().cancel();
verify(srcSubscription).cancel();
}
private Subscriber<Integer> subscribeToFlowPublisher(final Publisher<Integer> flowPublisher) {
@SuppressWarnings("unchecked")
Subscriber<Integer> subscriber = mock(Subscriber.class);
flowPublisher.subscribe(subscriber);
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(subscriptionCaptor.capture());
subscriptionCaptor.getValue().request(1);
return subscriber;
}
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
if ((boolean) SUBSCRIBED_HANDLE.getAndSet(this, true)) {
subscriber.onError(new IllegalStateException(
"Publisher " + this + " does not support multiple subscribers"));
return;
}
subscriber.onSubscribe(new IteratorBackedSubscription<>(this.asyncIterator, subscriber));
}
@Override
public Subscriber<Integer> createFlowSubscriber() {
return new FlowAdapter.SubscribingIterator<Integer>() {
@Override
public void onSubscribe(final Flow.Subscription subscription) {
super.onSubscribe(subscription);
consume();
}
};
}
public NumberSubscription(ExecutorService executor,Subscriber<? super Long> subscriber,long start_range,long stop_range) {
// TODO Auto-generated constructor stub
this.executor = executor;
this.subscriber=subscriber;
this.start_range=start_range;
this.stop_range=stop_range;
}
public NumberSubscription(ExecutorService executor,Subscriber<? super Long> subscriber,long start_range,long stop_range) {
// TODO Auto-generated constructor stub
this.executor = executor;
this.subscriber=subscriber;
this.start_range=start_range;
this.stop_range=stop_range;
}
private Sub createNewSubscriptionFor(Subscriber<? super Integer> subscriber) {
int startValue = subscriptions.stream()
.mapToInt(sub -> sub.nextValue.get())
.min()
.orElse(0);
return new Sub(subscriber, startValue);
}
private void setLinkAttachedProbe() {
var linkAttachedProbe = linkAttachedProbeFactory().get();
executor.setStdErrProcessor(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
//empty
}
@Override
public void onNext(String item) {
if (!linkAttached.isDone()) {
if (linkAttachedProbe.test(item)) {
log.info("Client is attached!!");
linkAttached.complete(null);
}
}
}
@Override
public void onError(Throwable throwable) {
linkAttached.completeExceptionally(throwable);
}
@Override
public void onComplete() {
linkAttached.complete(null);
}
});
}
public synchronized void subscribe(Subscriber<? super WeatherForecast> subscriber) {
subscriber.onSubscribe(new OnDemandWeatherForecastSubscription(subscriber, executor));
}
OnDemandWeatherForecastSubscription(Subscriber<? super WeatherForecast> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public void subscribeForEvents(Subscriber<? super SessionEvent> subscriber) {
eventPublisher.subscribe(subscriber);
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
source.subscribe(new FlowToStSubscriber<>(subscriber));
}
FlowToStSubscriber(final Subscriber<? super T> subscriber) {
this.subscriber = requireNonNull(subscriber);
}
@Override
protected void handleSubscribe(final PublisherSource.Subscriber<? super T> subscriber) {
source.subscribe(new StToFlowSubscriber<>(subscriber));
}
StToFlowSubscriber(final PublisherSource.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
private void verifyFlowSuccess(final Subscriber<Integer> subscriber) {
verify(subscriber).onSubscribe(any());
verify(subscriber).onNext(1);
verify(subscriber).onComplete();
verifyNoMoreInteractions(subscriber);
}
private void verifyFlowError(final Subscriber<Integer> subscriber) {
verify(subscriber).onSubscribe(any());
verify(subscriber).onError(DELIBERATE_EXCEPTION);
verifyNoMoreInteractions(subscriber);
}
private Subscriber<Integer> toFlowPublisherAndSubscribe(
final io.servicetalk.concurrent.api.Publisher<Integer> stPublisher) {
Publisher<Integer> flowPublisher = toFlowPublisher(stPublisher);
return subscribeToFlowPublisher(flowPublisher);
}
private Subscriber<Integer> toFlowPublisherFromSourceAndSubscribe(final PublisherSource<Integer> source) {
Publisher<Integer> flowPublisher = toFlowPublisher(source);
return subscribeToFlowPublisher(flowPublisher);
}
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
subscriber
.onSubscribe(
new IteratorBackedSubscription<>(this.asyncIteratorSupplier.get(), subscriber));
}
IteratorBackedSubscription(final AsyncIterator<T> iterator,
final Flow.Subscriber<? super T> subscriber) {
this.iterator = Objects.requireNonNull(iterator);
this.subscriber = Objects.requireNonNull(subscriber);
}
@Override
protected Subscriber<Integer> createFlowSubscriber(
final WhiteboxSubscriberProbe<Integer> probe) {
final Subscriber<Integer> backing = new FlowAdapter.SubscribingIterator<>();
return new Subscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
backing.onSubscribe(s);
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(final long elements) {
s.request(elements);
}
@Override
public void signalCancel() {
s.cancel();
}
});
}
@Override
public void onNext(final Integer integer) {
backing.onNext(integer);
probe.registerOnNext(integer);
}
@Override
public void onError(final Throwable throwable) {
backing.onError(throwable);
probe.registerOnError(throwable);
}
@Override
public void onComplete() {
backing.onComplete();
probe.registerOnComplete();
}
};
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
// TODO Auto-generated method stub
subscriber.onSubscribe(new NumberSubscription(executor,subscriber,start_range,stop_range));
}
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
// TODO Auto-generated method stub
subscriber.onSubscribe(new NumberSubscription(executor,subscriber,start_range,stop_range));
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
log("Subscriber registered: " + subscriber);
publisher.subscribe(new LoggingSubscriber(subscriber));
}
private LoggingSubscriber(Subscriber<? super ByteBuffer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
Sub subscription = createNewSubscriptionFor(subscriber);
registerSubscription(subscription);
subscriber.onSubscribe(subscription);
}