java.util.concurrent.Flow#Publisher ( )源码实例Demo

下面列出了java.util.concurrent.Flow#Publisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: spring-reactive-sample   文件: PostController.java
@GetMapping
    public Flow.Publisher<Post> all() {
//        Executor proxyExecutor = (Runnable command)-> ForkJoinPool.commonPool().execute(command);
//        SubmissionPublisher publisher  = new SubmissionPublisher(proxyExecutor, Flow.defaultBufferSize());
//        publisher.submit(new Post(1L, "post one", "content of post one"));
//        publisher.submit(new Post(2L, "post two", "content of post two"));
//
//        return publisher;
        // see: https://stackoverflow.com/questions/46597924/spring-5-supports-java-9-flow-apis-in-its-reactive-feature
        return JdkFlowAdapter.publisherToFlowPublisher(
                Flux.just(
                        new Post(1L, "post one", "content of post one"),
                        new Post(2L, "post two", "content of post two")
                )
        );
    }
 
public static void main(String[] args) {
    Flow.Publisher jdkPublisher = FlowAdapters.toFlowPublisher(new NewsServicePublisher(smp ->
        Flowable.intervalRange(0, 10, 0, 10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .map(e -> NewsLetter.template()
                                    .title(String.valueOf(e))
                                    .digest(Collections.emptyList())
                                    .build())
                .subscribe(smp)
    ));
    Publisher external = FlowAdapters.toPublisher(jdkPublisher);
    Flow.Publisher jdkPublisher2 = FlowAdapters.toFlowPublisher(
            external
    );

    NewsServiceSubscriber newsServiceSubscriber = new NewsServiceSubscriber(2);
    jdkPublisher2.subscribe(FlowAdapters.toFlowSubscriber(newsServiceSubscriber));



    while (true) {
        Optional<NewsLetter> letterOptional = newsServiceSubscriber.eventuallyReadDigest();

        if (letterOptional.isPresent()) {
            NewsLetter letter = letterOptional.get();
            System.out.println(letter);

            if (letter.getTitle().equals("9")) {
                break;
            }
        }
    }
}
 
源代码3 项目: java-async-util   文件: FlowAdapterTest.java
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
  final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
  final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
  publisher.subscribe(s);
  s.join();

  final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
  try {
    publisher.subscribe(s2);
  } catch (final Throwable e) {
    Assert.fail("failure should be notified via onError, got: " + e);
  }
  FlowAdapterTest.unwrap(s2);
}
 
源代码4 项目: java-async-util   文件: FlowAdapter.java
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
  this.publisher = Objects.requireNonNull(publisher);
}
 
源代码5 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert a {@link Flow.Publisher} into an {@link AsyncIterator}. Calling a terminal method on a
 * pipeline including the returned iterator will start a subscription on {@code publisher}.
 * Elements will be requested as the returned iterator is consumed, and the returned iterator will
 * stop iteration when the {@code publisher} {@link Subscriber#onComplete() finishes} or produces
 * an {@link Subscriber#onError(Throwable) error}.
 * <p>
 * Users of the returned {@link AsyncIterator} must call {@link AsyncIterator#close()} when they
 * have finished using the iterator so that {@code publisher} may clean up any associated
 * resources.
 * 
 * @param publisher that will be subscribed to in order to yield elements from the returned
 *        iterator
 * @return An {@link AsyncIterator} that will iterate over elements produced via a
 *         {@link Flow.Subscription} from the given {@code publisher}
 */
public static <T> AsyncIterator<T> toAsyncIterator(final Flow.Publisher<? extends T> publisher) {
  return new SubscribingIterator<>(publisher);
}
 
源代码6 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert an {@link AsyncIterator} into a {@link Flow.Publisher}. Because AsyncIterators are
 * single consumer, the returned publisher should only be subscribed to once. When
 * {@code asyncIterator} is exhausted or returns an exception the iterator will be
 * {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
 * the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
 * well.
 * <p>
 * Exceptions produced by either iteration or by close will be delivered via
 * {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
 * exception produced by close will be added as a suppressed exception to the iteration exception.
 * 
 * 
 * @param asyncIterator used to produce elements published by the returned {@link Publisher}
 * @return a {@link Publisher} that supports a single subscription that will yield elements from
 *         {@code asyncIterator}
 */
public static <T> Flow.Publisher<T> toPublisher(final AsyncIterator<? extends T> asyncIterator) {
  return new IteratorBackedPublisher<>(asyncIterator);
}
 
源代码7 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert a {@link Supplier} of {@link AsyncIterator AsyncIterators} into a
 * {@link Flow.Publisher}. Because AsyncIterators are single consumer, each subscription of the
 * returned publisher will generate a new AsyncIterator from {@code asyncIteratorSupplier}. When a
 * generated AsyncIterator is exhausted or returns an exception the iterator will be
 * {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
 * the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
 * well.
 * <p>
 * Exceptions produced by either iteration or by close will be delivered via
 * {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
 * exception produced by close will be added as a suppressed exception to the iteration exception.
 * 
 * @param asyncIteratorSupplier used to produce AsyncIterators of elements published by the
 *        returned {@link Publisher}
 * @return a {@link Publisher} that supports multiple subscriptions that will yield elements from
 *         AsyncIterators generated from {@code asyncIteratorSupplier}
 */
public static <T> Flow.Publisher<T> toPublisher(
    final Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
  return new SuppliedIteratorBackedPublisher<>(asyncIteratorSupplier);
}
 
源代码8 项目: reactor-core   文件: JdkFlowAdapter.java
/**
 * Return a java {@code Flow.Publisher} from a {@link Flux}
 * @param publisher the source Publisher to convert
 * @param <T> the type of the publisher
 * @return a java {@code Flow.Publisher} from the given {@link Publisher}
 */
public static <T> Flow.Publisher<T> publisherToFlowPublisher(final Publisher<T>
		publisher) {
	return new PublisherAsFlowPublisher<>(publisher);
}
 
源代码9 项目: reactor-core   文件: JdkFlowAdapter.java
/**
 * Return a {@link Flux} from a java {@code Flow.Publisher}
 *
 * @param publisher the source Publisher to convert
 * @param <T> the type of the publisher
 * @return a {@link Flux} from a java {@code Flow.Publisher}
 */
public static <T> Flux<T> flowPublisherToFlux(Flow.Publisher<T> publisher) {
	return new FlowPublisherAsFlux<>(publisher);
}