类java.util.concurrent.Flow.Subscriber源码实例Demo

下面列出了怎么用java.util.concurrent.Flow.Subscriber的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Publisher<Integer> newMockFlowPublisher(
        BiConsumer<Subscriber<? super Integer>, Subscription> subscriberTerminator) {
    @SuppressWarnings("unchecked")
    Publisher<Integer> flowPublisher = mock(Publisher.class);
    doAnswer(invocation -> {
        Subscriber<? super Integer> subscriber = invocation.getArgument(0);
        Subscription subscription = mock(Subscription.class);
        doAnswer(invocation1 -> {
            subscriberTerminator.accept(subscriber, subscription);
            return null;
        }).when(subscription).request(anyLong());
        subscriber.onSubscribe(subscription);
        return null;
    }).when(flowPublisher).subscribe(any());
    return flowPublisher;
}
 
源代码2 项目: enmasse   文件: Exec.java
/**
 * read method
 *
 * @return return future string of output
 */
public Future<String> read() {
    return CompletableFuture.supplyAsync(() -> {
        try (Scanner scanner = new Scanner(is)) {
            log.debug("Reading stream {}", is);
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                data.append(line);
                if (appendLineSeparator) {
                    data.append(System.getProperty("line.separator"));
                }
                subscribers.forEach(sub -> sub.onNext(line));
            }
            scanner.close();
            return data.toString();
        } catch (Exception e) {
            subscribers.forEach(sub -> sub.onError(e));
            throw new CompletionException(e);
        } finally {
            subscribers.forEach(Subscriber::onComplete);
        }
    }, runnable -> new Thread(runnable).start());
}
 
源代码3 项目: servicetalk   文件: JdkFlowAdaptersTest.java
@Test
public void toFlowCancel() {
    TestPublisher<Integer> stPublisher = new TestPublisher<>();
    Subscriber<Integer> subscriber = toFlowPublisherAndSubscribe(stPublisher);
    TestSubscription subscription = new TestSubscription();
    stPublisher.onSubscribe(subscription);
    assertThat("Source not subscribed.", stPublisher.isSubscribed(), is(true));
    ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
    verify(subscriber).onSubscribe(subscriptionCaptor.capture());
    subscriptionCaptor.getValue().cancel();
    assertThat("Subscription not cancelled.", subscription.isCancelled(), is(true));
}
 
源代码4 项目: servicetalk   文件: JdkFlowAdaptersTest.java
@Test
public void toFlowFromSourceCancel() {
    PublisherSource.Subscription srcSubscription = mock(PublisherSource.Subscription.class);
    PublisherSource<Integer> source = s -> s.onSubscribe(srcSubscription);
    Subscriber<Integer> subscriber = toFlowPublisherFromSourceAndSubscribe(source);
    ArgumentCaptor<Subscription> flowSubscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
    verify(subscriber).onSubscribe(flowSubscriptionCaptor.capture());
    flowSubscriptionCaptor.getValue().cancel();
    verify(srcSubscription).cancel();
}
 
源代码5 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> subscribeToFlowPublisher(final Publisher<Integer> flowPublisher) {
    @SuppressWarnings("unchecked")
    Subscriber<Integer> subscriber = mock(Subscriber.class);
    flowPublisher.subscribe(subscriber);
    ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
    verify(subscriber).onSubscribe(subscriptionCaptor.capture());
    subscriptionCaptor.getValue().request(1);
    return subscriber;
}
 
源代码6 项目: java-async-util   文件: FlowAdapter.java
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
  if ((boolean) SUBSCRIBED_HANDLE.getAndSet(this, true)) {
    subscriber.onError(new IllegalStateException(
        "Publisher " + this + " does not support multiple subscribers"));
    return;
  }
  subscriber.onSubscribe(new IteratorBackedSubscription<>(this.asyncIterator, subscriber));
}
 
@Override
public Subscriber<Integer> createFlowSubscriber() {
  return new FlowAdapter.SubscribingIterator<Integer>() {
    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
      super.onSubscribe(subscription);
      consume();
    }
  };
}
 
public NumberSubscription(ExecutorService executor,Subscriber<? super Long> subscriber,long start_range,long stop_range) {
	// TODO Auto-generated constructor stub
	this.executor = executor;
	this.subscriber=subscriber;
	this.start_range=start_range;
	this.stop_range=stop_range;
}
 
public NumberSubscription(ExecutorService executor,Subscriber<? super Long> subscriber,long start_range,long stop_range) {
	// TODO Auto-generated constructor stub
	this.executor = executor;
	this.subscriber=subscriber;
	this.start_range=start_range;
	this.stop_range=stop_range;
}
 
源代码10 项目: demo-java-x   文件: IncrementingPublisher.java
private Sub createNewSubscriptionFor(Subscriber<? super Integer> subscriber) {
	int startValue = subscriptions.stream()
			.mapToInt(sub -> sub.nextValue.get())
			.min()
			.orElse(0);
	return new Sub(subscriber, startValue);
}
 
源代码11 项目: enmasse   文件: AbstractClient.java
private void setLinkAttachedProbe() {
    var linkAttachedProbe = linkAttachedProbeFactory().get();
    executor.setStdErrProcessor(new Subscriber<String>() {

        @Override
        public void onSubscribe(Subscription subscription) {
            //empty
        }

        @Override
        public void onNext(String item) {
            if (!linkAttached.isDone()) {
                if (linkAttachedProbe.test(item)) {
                    log.info("Client is attached!!");
                    linkAttached.complete(null);
                }
            }
        }

        @Override
        public void onError(Throwable throwable) {
            linkAttached.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            linkAttached.complete(null);
        }
    });
}
 
public synchronized void subscribe(Subscriber<? super WeatherForecast> subscriber) {
	subscriber.onSubscribe(new OnDemandWeatherForecastSubscription(subscriber, executor));
}
 
OnDemandWeatherForecastSubscription(Subscriber<? super WeatherForecast> subscriber,
		ExecutorService executor) {
	this.subscriber = subscriber;
	this.executor = executor;
}
 
源代码14 项目: conga   文件: Session.java
public void subscribeForEvents(Subscriber<? super SessionEvent> subscriber) {
  eventPublisher.subscribe(subscriber);
}
 
源代码15 项目: servicetalk   文件: JdkFlowAdapters.java
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
    source.subscribe(new FlowToStSubscriber<>(subscriber));
}
 
源代码16 项目: servicetalk   文件: JdkFlowAdapters.java
FlowToStSubscriber(final Subscriber<? super T> subscriber) {
    this.subscriber = requireNonNull(subscriber);
}
 
源代码17 项目: servicetalk   文件: JdkFlowAdapters.java
@Override
protected void handleSubscribe(final PublisherSource.Subscriber<? super T> subscriber) {
    source.subscribe(new StToFlowSubscriber<>(subscriber));
}
 
源代码18 项目: servicetalk   文件: JdkFlowAdapters.java
StToFlowSubscriber(final PublisherSource.Subscriber<? super T> subscriber) {
    this.subscriber = subscriber;
}
 
源代码19 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private void verifyFlowSuccess(final Subscriber<Integer> subscriber) {
    verify(subscriber).onSubscribe(any());
    verify(subscriber).onNext(1);
    verify(subscriber).onComplete();
    verifyNoMoreInteractions(subscriber);
}
 
源代码20 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private void verifyFlowError(final Subscriber<Integer> subscriber) {
    verify(subscriber).onSubscribe(any());
    verify(subscriber).onError(DELIBERATE_EXCEPTION);
    verifyNoMoreInteractions(subscriber);
}
 
源代码21 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> toFlowPublisherAndSubscribe(
        final io.servicetalk.concurrent.api.Publisher<Integer> stPublisher) {
    Publisher<Integer> flowPublisher = toFlowPublisher(stPublisher);
    return subscribeToFlowPublisher(flowPublisher);
}
 
源代码22 项目: servicetalk   文件: JdkFlowAdaptersTest.java
private Subscriber<Integer> toFlowPublisherFromSourceAndSubscribe(final PublisherSource<Integer> source) {
    Publisher<Integer> flowPublisher = toFlowPublisher(source);
    return subscribeToFlowPublisher(flowPublisher);
}
 
源代码23 项目: java-async-util   文件: FlowAdapter.java
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
  subscriber
      .onSubscribe(
          new IteratorBackedSubscription<>(this.asyncIteratorSupplier.get(), subscriber));
}
 
源代码24 项目: java-async-util   文件: FlowAdapter.java
IteratorBackedSubscription(final AsyncIterator<T> iterator,
    final Flow.Subscriber<? super T> subscriber) {
  this.iterator = Objects.requireNonNull(iterator);
  this.subscriber = Objects.requireNonNull(subscriber);
}
 
@Override
protected Subscriber<Integer> createFlowSubscriber(
    final WhiteboxSubscriberProbe<Integer> probe) {
  final Subscriber<Integer> backing = new FlowAdapter.SubscribingIterator<>();
  return new Subscriber<Integer>() {
    @Override
    public void onSubscribe(final Subscription s) {
      backing.onSubscribe(s);

      probe.registerOnSubscribe(new SubscriberPuppet() {

        @Override
        public void triggerRequest(final long elements) {
          s.request(elements);
        }

        @Override
        public void signalCancel() {
          s.cancel();
        }
      });
    }

    @Override
    public void onNext(final Integer integer) {
      backing.onNext(integer);
      probe.registerOnNext(integer);
    }

    @Override
    public void onError(final Throwable throwable) {
      backing.onError(throwable);
      probe.registerOnError(throwable);
    }

    @Override
    public void onComplete() {
      backing.onComplete();
      probe.registerOnComplete();
    }
  };
}
 
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
	// TODO Auto-generated method stub
	subscriber.onSubscribe(new NumberSubscription(executor,subscriber,start_range,stop_range));

}
 
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
	// TODO Auto-generated method stub
	subscriber.onSubscribe(new NumberSubscription(executor,subscriber,start_range,stop_range));

}
 
源代码28 项目: demo-java-x   文件: ReactivePost.java
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
	log("Subscriber registered: " + subscriber);
	publisher.subscribe(new LoggingSubscriber(subscriber));
}
 
源代码29 项目: demo-java-x   文件: ReactivePost.java
private LoggingSubscriber(Subscriber<? super ByteBuffer> subscriber) {
	this.subscriber = subscriber;
}
 
源代码30 项目: demo-java-x   文件: IncrementingPublisher.java
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
	Sub subscription = createNewSubscriptionFor(subscriber);
	registerSubscription(subscription);
	subscriber.onSubscribe(subscription);
}
 
 类所在包
 类方法
 同包方法