下面列出了java.util.concurrent.Flow#Publisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@GetMapping
public Flow.Publisher<Post> all() {
// Executor proxyExecutor = (Runnable command)-> ForkJoinPool.commonPool().execute(command);
// SubmissionPublisher publisher = new SubmissionPublisher(proxyExecutor, Flow.defaultBufferSize());
// publisher.submit(new Post(1L, "post one", "content of post one"));
// publisher.submit(new Post(2L, "post two", "content of post two"));
//
// return publisher;
// see: https://stackoverflow.com/questions/46597924/spring-5-supports-java-9-flow-apis-in-its-reactive-feature
return JdkFlowAdapter.publisherToFlowPublisher(
Flux.just(
new Post(1L, "post one", "content of post one"),
new Post(2L, "post two", "content of post two")
)
);
}
public static void main(String[] args) {
Flow.Publisher jdkPublisher = FlowAdapters.toFlowPublisher(new NewsServicePublisher(smp ->
Flowable.intervalRange(0, 10, 0, 10, TimeUnit.MILLISECONDS, Schedulers.computation())
.map(e -> NewsLetter.template()
.title(String.valueOf(e))
.digest(Collections.emptyList())
.build())
.subscribe(smp)
));
Publisher external = FlowAdapters.toPublisher(jdkPublisher);
Flow.Publisher jdkPublisher2 = FlowAdapters.toFlowPublisher(
external
);
NewsServiceSubscriber newsServiceSubscriber = new NewsServiceSubscriber(2);
jdkPublisher2.subscribe(FlowAdapters.toFlowSubscriber(newsServiceSubscriber));
while (true) {
Optional<NewsLetter> letterOptional = newsServiceSubscriber.eventuallyReadDigest();
if (letterOptional.isPresent()) {
NewsLetter letter = letterOptional.get();
System.out.println(letter);
if (letter.getTitle().equals("9")) {
break;
}
}
}
}
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
publisher.subscribe(s);
s.join();
final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
try {
publisher.subscribe(s2);
} catch (final Throwable e) {
Assert.fail("failure should be notified via onError, got: " + e);
}
FlowAdapterTest.unwrap(s2);
}
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
this.publisher = Objects.requireNonNull(publisher);
}
/**
* Convert a {@link Flow.Publisher} into an {@link AsyncIterator}. Calling a terminal method on a
* pipeline including the returned iterator will start a subscription on {@code publisher}.
* Elements will be requested as the returned iterator is consumed, and the returned iterator will
* stop iteration when the {@code publisher} {@link Subscriber#onComplete() finishes} or produces
* an {@link Subscriber#onError(Throwable) error}.
* <p>
* Users of the returned {@link AsyncIterator} must call {@link AsyncIterator#close()} when they
* have finished using the iterator so that {@code publisher} may clean up any associated
* resources.
*
* @param publisher that will be subscribed to in order to yield elements from the returned
* iterator
* @return An {@link AsyncIterator} that will iterate over elements produced via a
* {@link Flow.Subscription} from the given {@code publisher}
*/
public static <T> AsyncIterator<T> toAsyncIterator(final Flow.Publisher<? extends T> publisher) {
return new SubscribingIterator<>(publisher);
}
/**
* Convert an {@link AsyncIterator} into a {@link Flow.Publisher}. Because AsyncIterators are
* single consumer, the returned publisher should only be subscribed to once. When
* {@code asyncIterator} is exhausted or returns an exception the iterator will be
* {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
* the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
* well.
* <p>
* Exceptions produced by either iteration or by close will be delivered via
* {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
* exception produced by close will be added as a suppressed exception to the iteration exception.
*
*
* @param asyncIterator used to produce elements published by the returned {@link Publisher}
* @return a {@link Publisher} that supports a single subscription that will yield elements from
* {@code asyncIterator}
*/
public static <T> Flow.Publisher<T> toPublisher(final AsyncIterator<? extends T> asyncIterator) {
return new IteratorBackedPublisher<>(asyncIterator);
}
/**
* Convert a {@link Supplier} of {@link AsyncIterator AsyncIterators} into a
* {@link Flow.Publisher}. Because AsyncIterators are single consumer, each subscription of the
* returned publisher will generate a new AsyncIterator from {@code asyncIteratorSupplier}. When a
* generated AsyncIterator is exhausted or returns an exception the iterator will be
* {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
* the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
* well.
* <p>
* Exceptions produced by either iteration or by close will be delivered via
* {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
* exception produced by close will be added as a suppressed exception to the iteration exception.
*
* @param asyncIteratorSupplier used to produce AsyncIterators of elements published by the
* returned {@link Publisher}
* @return a {@link Publisher} that supports multiple subscriptions that will yield elements from
* AsyncIterators generated from {@code asyncIteratorSupplier}
*/
public static <T> Flow.Publisher<T> toPublisher(
final Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
return new SuppliedIteratorBackedPublisher<>(asyncIteratorSupplier);
}
/**
* Return a java {@code Flow.Publisher} from a {@link Flux}
* @param publisher the source Publisher to convert
* @param <T> the type of the publisher
* @return a java {@code Flow.Publisher} from the given {@link Publisher}
*/
public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
publisher) {
return new PublisherAsFlowPublisher<>(publisher);
}
/**
* Return a {@link Flux} from a java {@code Flow.Publisher}
*
* @param publisher the source Publisher to convert
* @param <T> the type of the publisher
* @return a {@link Flux} from a java {@code Flow.Publisher}
*/
public static <T> Flux<T> flowPublisherToFlux(Flow.Publisher<T> publisher) {
return new FlowPublisherAsFlux<>(publisher);
}