下面列出了怎么用java.util.concurrent.Flow.Publisher的API类实例代码及写法,或者点击链接到github查看源代码。
private Publisher<Integer> newMockFlowPublisher(
BiConsumer<Subscriber<? super Integer>, Subscription> subscriberTerminator) {
@SuppressWarnings("unchecked")
Publisher<Integer> flowPublisher = mock(Publisher.class);
doAnswer(invocation -> {
Subscriber<? super Integer> subscriber = invocation.getArgument(0);
Subscription subscription = mock(Subscription.class);
doAnswer(invocation1 -> {
subscriberTerminator.accept(subscriber, subscription);
return null;
}).when(subscription).request(anyLong());
subscriber.onSubscribe(subscription);
return null;
}).when(flowPublisher).subscribe(any());
return flowPublisher;
}
@Test(expected = IOException.class)
public void testCancelCloseException() throws Throwable {
final ConsumingSubscriber<Long> subscriber = new ConsumingSubscriber<Long>() {
@Override
public void onNext(final Long arg0) {}
};
final Publisher<Long> publisher =
FlowAdapter.toPublisher(new ExceptionalIterator(10, null, new IOException()));
publisher.subscribe(subscriber);
subscriber.request();
subscriber.request();
Assert.assertFalse(subscriber.isDone());
subscriber.cancel();
Assert.assertTrue(subscriber.isDone());
FlowAdapterTest.unwrap(subscriber);
}
@Test
public void testMultiSubscribeInSeries() {
final List<CloseTrackingIt> its = new ArrayList<>();
final Publisher<Integer> p = FlowAdapter.toPublisher(() -> {
final CloseTrackingIt it = new CloseTrackingIt(5);
its.add(it);
return it;
});
// subscribe twice in series
final int firstSum =
FlowAdapter.toAsyncIterator(p).fold(0, (i, j) -> i + j).toCompletableFuture().join();
final int secondSum =
FlowAdapter.toAsyncIterator(p).fold(0, (i, j) -> i + j).toCompletableFuture().join();
Assert.assertEquals(2, its.size());
Assert.assertTrue(its.stream().allMatch(it -> it.closed));
Assert.assertEquals(15, secondSum);
Assert.assertEquals(15, firstSum);
}
@Test
public void testMultiSubscribeInParallel() {
final List<CloseTrackingIt> its = new ArrayList<>();
final Publisher<Integer> p = FlowAdapter.toPublisher(() -> {
final CloseTrackingIt it = new CloseTrackingIt(5);
its.add(it);
return it;
});
// subscribe twice in parallel
final AsyncIterator<Integer> it1 = FlowAdapter.toAsyncIterator(p);
Assert.assertEquals(0, it1.nextStage().toCompletableFuture().join().right().get().intValue());
final AsyncIterator<Integer> it2 = FlowAdapter.toAsyncIterator(p);
Assert.assertEquals(0, it2.nextStage().toCompletableFuture().join().right().get().intValue());
final int firstSum = it1.fold(0, (i, j) -> i + j).toCompletableFuture().join();
final int secondSum = it2.fold(0, (i, j) -> i + j).toCompletableFuture().join();
Assert.assertEquals(2, its.size());
Assert.assertTrue(its.stream().allMatch(it -> it.closed));
Assert.assertEquals(15, secondSum);
Assert.assertEquals(15, firstSum);
}
@Test
public void fromFlowSuccess() throws Exception {
Publisher<Integer> flowPublisher = newMockFlowPublisher((subscriber, __) -> {
subscriber.onNext(1);
subscriber.onComplete();
});
Integer result = fromFlowPublisher(flowPublisher).firstOrElse(() -> null).toFuture().get();
assertThat("Unexpected result", result, is(1));
}
@Test
public void fromFlowError() throws Exception {
Publisher<Integer> flowPublisher = newMockFlowPublisher((subscriber, __) ->
subscriber.onError(DELIBERATE_EXCEPTION));
Future<Integer> future = fromFlowPublisher(flowPublisher).firstOrElse(() -> null).toFuture();
expectedException.expect(instanceOf(ExecutionException.class));
expectedException.expectCause(sameInstance(DELIBERATE_EXCEPTION));
future.get();
}
private Subscriber<Integer> subscribeToFlowPublisher(final Publisher<Integer> flowPublisher) {
@SuppressWarnings("unchecked")
Subscriber<Integer> subscriber = mock(Subscriber.class);
flowPublisher.subscribe(subscriber);
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
verify(subscriber).onSubscribe(subscriptionCaptor.capture());
subscriptionCaptor.getValue().request(1);
return subscriber;
}
@Override
public Publisher<Integer> createFlowPublisher(final long l) {
AsyncIterator<Integer> it = AsyncIterator.repeat(1);
// infinite on MAX_VALUE
if (l != Long.MAX_VALUE) {
it = it.take(l);
}
return FlowAdapter.toPublisher(it);
}
@Override
public Publisher<Integer> createFailedFlowPublisher() {
// return ReactiveStreamsConverter.toPublisher(AsyncIterator.error(new RuntimeException("test
// error")));
// null ignores these tests. An iterator's error is lazy (requires a request to get an error),
// but there are two tests that test for an error on subscription
return null;
}
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
publisher.subscribe(s);
s.join();
final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
try {
publisher.subscribe(s2);
} catch (final Throwable e) {
Assert.fail("failure should be notified via onError, got: " + e);
}
FlowAdapterTest.unwrap(s2);
}
private static void consume(final AsyncIterator<Long> it) throws Throwable {
final Publisher<Long> publisher = FlowAdapter.toPublisher(it);
final ConsumingSubscriber<Long> stage = new ConsumingSubscriber<>();
publisher.subscribe(stage);
FlowAdapterTest.unwrap(stage);
}
private Subscriber<Integer> toFlowPublisherAndSubscribe(
final io.servicetalk.concurrent.api.Publisher<Integer> stPublisher) {
Publisher<Integer> flowPublisher = toFlowPublisher(stPublisher);
return subscribeToFlowPublisher(flowPublisher);
}
private Subscriber<Integer> toFlowPublisherFromSourceAndSubscribe(final PublisherSource<Integer> source) {
Publisher<Integer> flowPublisher = toFlowPublisher(source);
return subscribeToFlowPublisher(flowPublisher);
}
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
this.publisher = Objects.requireNonNull(publisher);
}
public static void createAndSubscribe(String name, Publisher<?> publisher) {
publisher.subscribe(new LoggingRandomDelaySubscriber(name));
}
public static void main(String[] args) {
long start_range=10, stop_range=22;
Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);
// Register Subscriber
int count=10;
NumberSubscriber subscriber = new NumberSubscriber(count);
publisher.subscribe(subscriber);
}
public static void main(String[] args) {
long start_range=10, stop_range=22;
Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);
// Register Subscriber
int count=5;
NumberSubscriber subscriber = new NumberSubscriber(count);
publisher.subscribe(subscriber);
}
/**
* Convert a {@link Flow.Publisher} into an {@link AsyncIterator}. Calling a terminal method on a
* pipeline including the returned iterator will start a subscription on {@code publisher}.
* Elements will be requested as the returned iterator is consumed, and the returned iterator will
* stop iteration when the {@code publisher} {@link Subscriber#onComplete() finishes} or produces
* an {@link Subscriber#onError(Throwable) error}.
* <p>
* Users of the returned {@link AsyncIterator} must call {@link AsyncIterator#close()} when they
* have finished using the iterator so that {@code publisher} may clean up any associated
* resources.
*
* @param publisher that will be subscribed to in order to yield elements from the returned
* iterator
* @return An {@link AsyncIterator} that will iterate over elements produced via a
* {@link Flow.Subscription} from the given {@code publisher}
*/
public static <T> AsyncIterator<T> toAsyncIterator(final Flow.Publisher<? extends T> publisher) {
return new SubscribingIterator<>(publisher);
}
/**
* Convert an {@link AsyncIterator} into a {@link Flow.Publisher}. Because AsyncIterators are
* single consumer, the returned publisher should only be subscribed to once. When
* {@code asyncIterator} is exhausted or returns an exception the iterator will be
* {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
* the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
* well.
* <p>
* Exceptions produced by either iteration or by close will be delivered via
* {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
* exception produced by close will be added as a suppressed exception to the iteration exception.
*
*
* @param asyncIterator used to produce elements published by the returned {@link Publisher}
* @return a {@link Publisher} that supports a single subscription that will yield elements from
* {@code asyncIterator}
*/
public static <T> Flow.Publisher<T> toPublisher(final AsyncIterator<? extends T> asyncIterator) {
return new IteratorBackedPublisher<>(asyncIterator);
}
/**
* Convert a {@link Supplier} of {@link AsyncIterator AsyncIterators} into a
* {@link Flow.Publisher}. Because AsyncIterators are single consumer, each subscription of the
* returned publisher will generate a new AsyncIterator from {@code asyncIteratorSupplier}. When a
* generated AsyncIterator is exhausted or returns an exception the iterator will be
* {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
* the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
* well.
* <p>
* Exceptions produced by either iteration or by close will be delivered via
* {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
* exception produced by close will be added as a suppressed exception to the iteration exception.
*
* @param asyncIteratorSupplier used to produce AsyncIterators of elements published by the
* returned {@link Publisher}
* @return a {@link Publisher} that supports multiple subscriptions that will yield elements from
* AsyncIterators generated from {@code asyncIteratorSupplier}
*/
public static <T> Flow.Publisher<T> toPublisher(
final Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
return new SuppliedIteratorBackedPublisher<>(asyncIteratorSupplier);
}