下面列出了怎么用java.util.concurrent.Flow.Subscription的API类实例代码及写法,或者点击链接到github查看源代码。
private Publisher<Integer> newMockFlowPublisher(
BiConsumer<Subscriber<? super Integer>, Subscription> subscriberTerminator) {
@SuppressWarnings("unchecked")
Publisher<Integer> flowPublisher = mock(Publisher.class);
doAnswer(invocation -> {
Subscriber<? super Integer> subscriber = invocation.getArgument(0);
Subscription subscription = mock(Subscription.class);
doAnswer(invocation1 -> {
subscriberTerminator.accept(subscriber, subscription);
return null;
}).when(subscription).request(anyLong());
subscriber.onSubscribe(subscription);
return null;
}).when(flowPublisher).subscribe(any());
return flowPublisher;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
LOGGER.info("Successfully subscribed to publisher.");
this.subscription.request(1);
}
@Override
public void onSubscribe(final PublisherSource.Subscription subscription) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
});
}
@Test
public void toFlowCancel() {
TestPublisher<Integer> stPublisher = new TestPublisher<>();
Subscriber<Integer> subscriber = toFlowPublisherAndSubscribe(stPublisher);
TestSubscription subscription = new TestSubscription();
stPublisher.onSubscribe(subscription);
assertThat("Source not subscribed.", stPublisher.isSubscribed(), is(true));
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(subscriptionCaptor.capture());
subscriptionCaptor.getValue().cancel();
assertThat("Subscription not cancelled.", subscription.isCancelled(), is(true));
}
@Test
public void toFlowFromSourceCancel() {
PublisherSource.Subscription srcSubscription = mock(PublisherSource.Subscription.class);
PublisherSource<Integer> source = s -> s.onSubscribe(srcSubscription);
Subscriber<Integer> subscriber = toFlowPublisherFromSourceAndSubscribe(source);
ArgumentCaptor<Subscription> flowSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(flowSubscriptionCaptor.capture());
flowSubscriptionCaptor.getValue().cancel();
verify(srcSubscription).cancel();
}
private Subscriber<Integer> subscribeToFlowPublisher(final Publisher<Integer> flowPublisher) {
@SuppressWarnings("unchecked")
Subscriber<Integer> subscriber = mock(Subscriber.class);
flowPublisher.subscribe(subscriber);
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(subscriptionCaptor.capture());
subscriptionCaptor.getValue().request(1);
return subscriber;
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
}
@Override
public void onSubscribe(Subscription subscription) {
group = session.<Long, Long>operationGroup()
.independent()
.collect(Collectors.summingLong(c -> c));
group.submit();
session.requestHook(subscription::request);
}
@Override
public void onSubscribe(Subscription subscription) {
ds = factory.builder()
.url("//host.oracle.com:5521/example")
.username("scott")
.password("tiger")
.requestHook(subscription::request)
.build();
}
@Override
public void onSubscribe(Subscription subscription) {
log("Subscribed");
this.subscription = subscription;
requestItems(demand);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item");
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed for Freelancer");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item for Freelancer");
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException();
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
log("Subscribed...");
this.subscription = subscription;
this.buffer = new AtomicInteger();
requestItems();
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed for Freelancer");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item for Freelancer");
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("onSubscribe requested 1 item");
}
private void setLinkAttachedProbe() {
var linkAttachedProbe = linkAttachedProbeFactory().get();
executor.setStdErrProcessor(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
//empty
}
@Override
public void onNext(String item) {
if (!linkAttached.isDone()) {
if (linkAttachedProbe.test(item)) {
log.info("Client is attached!!");
linkAttached.complete(null);
}
}
}
@Override
public void onError(Throwable throwable) {
linkAttached.completeExceptionally(throwable);
}
@Override
public void onComplete() {
linkAttached.complete(null);
}
});
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(name + " subscribed!");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(name + " subscribed!");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(name + " subscribed!");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(name + " subscribed!");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
Trader.this.subscription = subscription;
request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
FlowToSTSubscription(final Subscription s) {
this.s = s;
}
@Override
public void onSubscribe(final Subscription s) {
subscriber.onSubscribe(new FlowToSTSubscription(s));
}
@Override
public void onSubscribe(final Subscription arg0) {
this.subscription = arg0;
this.subscription.request(1);
}
@Override
protected Subscriber<Integer> createFlowSubscriber(
final WhiteboxSubscriberProbe<Integer> probe) {
final Subscriber<Integer> backing = new FlowAdapter.SubscribingIterator<>();
return new Subscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
backing.onSubscribe(s);
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(final long elements) {
s.request(elements);
}
@Override
public void signalCancel() {
s.cancel();
}
});
}
@Override
public void onNext(final Integer integer) {
backing.onNext(integer);
probe.registerOnNext(integer);
}
@Override
public void onError(final Throwable throwable) {
backing.onError(throwable);
probe.registerOnError(throwable);
}
@Override
public void onComplete() {
backing.onComplete();
probe.registerOnComplete();
}
};
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.printf("new subscription %s\n", subscription);
this.subscription = subscription;
subscription.request(1);
}