类java.util.concurrent.Flow.Publisher源码实例Demo

下面列出了怎么用java.util.concurrent.Flow.Publisher的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Publisher<Integer> newMockFlowPublisher(
        BiConsumer<Subscriber<? super Integer>, Subscription> subscriberTerminator) {
    @SuppressWarnings("unchecked")
    Publisher<Integer> flowPublisher = mock(Publisher.class);
    doAnswer(invocation -> {
        Subscriber<? super Integer> subscriber = invocation.getArgument(0);
        Subscription subscription = mock(Subscription.class);
        doAnswer(invocation1 -> {
            subscriberTerminator.accept(subscriber, subscription);
            return null;
        }).when(subscription).request(anyLong());
        subscriber.onSubscribe(subscription);
        return null;
    }).when(flowPublisher).subscribe(any());
    return flowPublisher;
}
 
源代码2 项目: java-async-util   文件: FlowAdapterTest.java
@Test(expected = IOException.class)
public void testCancelCloseException() throws Throwable {
  final ConsumingSubscriber<Long> subscriber = new ConsumingSubscriber<Long>() {
    @Override
    public void onNext(final Long arg0) {}
  };
  final Publisher<Long> publisher =
      FlowAdapter.toPublisher(new ExceptionalIterator(10, null, new IOException()));
  publisher.subscribe(subscriber);
  subscriber.request();
  subscriber.request();
  Assert.assertFalse(subscriber.isDone());

  subscriber.cancel();
  Assert.assertTrue(subscriber.isDone());
  FlowAdapterTest.unwrap(subscriber);
}
 
源代码3 项目: java-async-util   文件: FlowAdapterTest.java
@Test
public void testMultiSubscribeInSeries() {
  final List<CloseTrackingIt> its = new ArrayList<>();
  final Publisher<Integer> p = FlowAdapter.toPublisher(() -> {
    final CloseTrackingIt it = new CloseTrackingIt(5);
    its.add(it);
    return it;
  });

  // subscribe twice in series
  final int firstSum =
      FlowAdapter.toAsyncIterator(p).fold(0, (i, j) -> i + j).toCompletableFuture().join();
  final int secondSum =
      FlowAdapter.toAsyncIterator(p).fold(0, (i, j) -> i + j).toCompletableFuture().join();

  Assert.assertEquals(2, its.size());
  Assert.assertTrue(its.stream().allMatch(it -> it.closed));
  Assert.assertEquals(15, secondSum);
  Assert.assertEquals(15, firstSum);
}
 
源代码4 项目: java-async-util   文件: FlowAdapterTest.java
@Test
public void testMultiSubscribeInParallel() {
  final List<CloseTrackingIt> its = new ArrayList<>();
  final Publisher<Integer> p = FlowAdapter.toPublisher(() -> {
    final CloseTrackingIt it = new CloseTrackingIt(5);
    its.add(it);
    return it;
  });

  // subscribe twice in parallel
  final AsyncIterator<Integer> it1 = FlowAdapter.toAsyncIterator(p);
  Assert.assertEquals(0, it1.nextStage().toCompletableFuture().join().right().get().intValue());

  final AsyncIterator<Integer> it2 = FlowAdapter.toAsyncIterator(p);
  Assert.assertEquals(0, it2.nextStage().toCompletableFuture().join().right().get().intValue());

  final int firstSum = it1.fold(0, (i, j) -> i + j).toCompletableFuture().join();
  final int secondSum = it2.fold(0, (i, j) -> i + j).toCompletableFuture().join();

  Assert.assertEquals(2, its.size());
  Assert.assertTrue(its.stream().allMatch(it -> it.closed));
  Assert.assertEquals(15, secondSum);
  Assert.assertEquals(15, firstSum);
}
 
源代码5 项目: servicetalk   文件: JdkFlowAdaptersTest.java
@Test
public void fromFlowSuccess() throws Exception {
    Publisher<Integer> flowPublisher = newMockFlowPublisher((subscriber, __) -> {
        subscriber.onNext(1);
        subscriber.onComplete();
    });
    Integer result = fromFlowPublisher(flowPublisher).firstOrElse(() -> null).toFuture().get();
    assertThat("Unexpected result", result, is(1));
}
 
源代码6 项目: servicetalk   文件: JdkFlowAdaptersTest.java
@Test
public void fromFlowError() throws Exception {
    Publisher<Integer> flowPublisher = newMockFlowPublisher((subscriber, __) ->
            subscriber.onError(DELIBERATE_EXCEPTION));
    Future<Integer> future = fromFlowPublisher(flowPublisher).firstOrElse(() -> null).toFuture();
    expectedException.expect(instanceOf(ExecutionException.class));
    expectedException.expectCause(sameInstance(DELIBERATE_EXCEPTION));
    future.get();
}
 
源代码7 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> subscribeToFlowPublisher(final Publisher<Integer> flowPublisher) {
    @SuppressWarnings("unchecked")
    Subscriber<Integer> subscriber = mock(Subscriber.class);
    flowPublisher.subscribe(subscriber);
    ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
    verify(subscriber).onSubscribe(subscriptionCaptor.capture());
    subscriptionCaptor.getValue().request(1);
    return subscriber;
}
 
@Override
public Publisher<Integer> createFlowPublisher(final long l) {
  AsyncIterator<Integer> it = AsyncIterator.repeat(1);
  // infinite on MAX_VALUE
  if (l != Long.MAX_VALUE) {
    it = it.take(l);
  }
  return FlowAdapter.toPublisher(it);
}
 
@Override
public Publisher<Integer> createFailedFlowPublisher() {
  // return ReactiveStreamsConverter.toPublisher(AsyncIterator.error(new RuntimeException("test
  // error")));
  // null ignores these tests. An iterator's error is lazy (requires a request to get an error),
  // but there are two tests that test for an error on subscription
  return null;
}
 
源代码10 项目: java-async-util   文件: FlowAdapterTest.java
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
  final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
  final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
  publisher.subscribe(s);
  s.join();

  final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
  try {
    publisher.subscribe(s2);
  } catch (final Throwable e) {
    Assert.fail("failure should be notified via onError, got: " + e);
  }
  FlowAdapterTest.unwrap(s2);
}
 
源代码11 项目: java-async-util   文件: FlowAdapterTest.java
private static void consume(final AsyncIterator<Long> it) throws Throwable {
  final Publisher<Long> publisher = FlowAdapter.toPublisher(it);
  final ConsumingSubscriber<Long> stage = new ConsumingSubscriber<>();
  publisher.subscribe(stage);
  FlowAdapterTest.unwrap(stage);

}
 
源代码12 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> toFlowPublisherAndSubscribe(
        final io.servicetalk.concurrent.api.Publisher<Integer> stPublisher) {
    Publisher<Integer> flowPublisher = toFlowPublisher(stPublisher);
    return subscribeToFlowPublisher(flowPublisher);
}
 
源代码13 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> toFlowPublisherFromSourceAndSubscribe(final PublisherSource<Integer> source) {
    Publisher<Integer> flowPublisher = toFlowPublisher(source);
    return subscribeToFlowPublisher(flowPublisher);
}
 
源代码14 项目: java-async-util   文件: FlowAdapter.java
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
  this.publisher = Objects.requireNonNull(publisher);
}
 
源代码15 项目: demo-java-x   文件: LoggingRandomDelaySubscriber.java
public static void createAndSubscribe(String name, Publisher<?> publisher) {
	publisher.subscribe(new LoggingRandomDelaySubscriber(name));
}
 
public static void main(String[] args) {

		long start_range=10, stop_range=22;
		Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);

		// Register Subscriber
		int count=10;
		NumberSubscriber subscriber = new NumberSubscriber(count);
		publisher.subscribe(subscriber);
		
		

	}
 
public static void main(String[] args) {

		long start_range=10, stop_range=22;
		Publisher<Long> publisher = new NumberPublisher(start_range,stop_range);

		// Register Subscriber
		int count=5;
		NumberSubscriber subscriber = new NumberSubscriber(count);
		publisher.subscribe(subscriber);
		
		

	}
 
源代码18 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert a {@link Flow.Publisher} into an {@link AsyncIterator}. Calling a terminal method on a
 * pipeline including the returned iterator will start a subscription on {@code publisher}.
 * Elements will be requested as the returned iterator is consumed, and the returned iterator will
 * stop iteration when the {@code publisher} {@link Subscriber#onComplete() finishes} or produces
 * an {@link Subscriber#onError(Throwable) error}.
 * <p>
 * Users of the returned {@link AsyncIterator} must call {@link AsyncIterator#close()} when they
 * have finished using the iterator so that {@code publisher} may clean up any associated
 * resources.
 * 
 * @param publisher that will be subscribed to in order to yield elements from the returned
 *        iterator
 * @return An {@link AsyncIterator} that will iterate over elements produced via a
 *         {@link Flow.Subscription} from the given {@code publisher}
 */
public static <T> AsyncIterator<T> toAsyncIterator(final Flow.Publisher<? extends T> publisher) {
  return new SubscribingIterator<>(publisher);
}
 
源代码19 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert an {@link AsyncIterator} into a {@link Flow.Publisher}. Because AsyncIterators are
 * single consumer, the returned publisher should only be subscribed to once. When
 * {@code asyncIterator} is exhausted or returns an exception the iterator will be
 * {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
 * the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
 * well.
 * <p>
 * Exceptions produced by either iteration or by close will be delivered via
 * {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
 * exception produced by close will be added as a suppressed exception to the iteration exception.
 * 
 * 
 * @param asyncIterator used to produce elements published by the returned {@link Publisher}
 * @return a {@link Publisher} that supports a single subscription that will yield elements from
 *         {@code asyncIterator}
 */
public static <T> Flow.Publisher<T> toPublisher(final AsyncIterator<? extends T> asyncIterator) {
  return new IteratorBackedPublisher<>(asyncIterator);
}
 
源代码20 项目: java-async-util   文件: FlowAdapter.java
/**
 * Convert a {@link Supplier} of {@link AsyncIterator AsyncIterators} into a
 * {@link Flow.Publisher}. Because AsyncIterators are single consumer, each subscription of the
 * returned publisher will generate a new AsyncIterator from {@code asyncIteratorSupplier}. When a
 * generated AsyncIterator is exhausted or returns an exception the iterator will be
 * {@link AsyncIterator#close() closed} and {@link Subscriber} will be notified accordingly. If
 * the {@link Subscription} is cancelled before iterator is complete, the iterator be closed as
 * well.
 * <p>
 * Exceptions produced by either iteration or by close will be delivered via
 * {@link Subscriber#onError(Throwable)}. If both iteration and close produce exceptions, the
 * exception produced by close will be added as a suppressed exception to the iteration exception.
 * 
 * @param asyncIteratorSupplier used to produce AsyncIterators of elements published by the
 *        returned {@link Publisher}
 * @return a {@link Publisher} that supports multiple subscriptions that will yield elements from
 *         AsyncIterators generated from {@code asyncIteratorSupplier}
 */
public static <T> Flow.Publisher<T> toPublisher(
    final Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
  return new SuppliedIteratorBackedPublisher<>(asyncIteratorSupplier);
}
 
 类所在包
 同包方法