下面列出了怎么用java.util.concurrent.Flow的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();
weatherForecastPublisher.subscribe(new DatabaseSubscriber());
weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());
Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
weatherForecastPublisher.subscribe(metricConverter);
metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());
// close the publisher and associated resources after 10 seconds
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
weatherForecastPublisher.close();
}
@Override
public ParameterizedRowPublisherOperation<R> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
CompletionStage<? extends R> result) {
if (result == null) {
throw new IllegalArgumentException("result is not allowed to be null");
}
if (subscriber == null) {
throw new IllegalArgumentException("subscriber is not allowed to be null");
}
publisher.subscribe(subscriber);
this.result = result;
result.thenAccept(r -> {
if (groupSubmission != null) {
groupSubmission.addGroupResult(r);
}
submission.getCompletionStage().toCompletableFuture().complete(r);
});
return this;
}
@GetMapping
public Flow.Publisher<Post> all() {
// Executor proxyExecutor = (Runnable command)-> ForkJoinPool.commonPool().execute(command);
// SubmissionPublisher publisher = new SubmissionPublisher(proxyExecutor, Flow.defaultBufferSize());
// publisher.submit(new Post(1L, "post one", "content of post one"));
// publisher.submit(new Post(2L, "post two", "content of post two"));
//
// return publisher;
// see: https://stackoverflow.com/questions/46597924/spring-5-supports-java-9-flow-apis-in-its-reactive-feature
return JdkFlowAdapter.publisherToFlowPublisher(
Flux.just(
new Post(1L, "post one", "content of post one"),
new Post(2L, "post two", "content of post two")
)
);
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicInteger numMessagesToSend = new AtomicInteger();
private final Actor<Flow.Subscriber<? super Integer>, Void, Void> actorRefill = ActorSystem.anonymous().newActor(sub -> {
while (numSent.get() < numMax && numMessagesToSend.get() > 0) {
subscriber.onNext(numSent.incrementAndGet());
numMessagesToSend.decrementAndGet();
}
if (numSent.get() >= numMax) {
if (completed.compareAndSet(false, true))
subscriber.onComplete();
}
});
@Override
public void request(long n) {
if (numSent.get() >= numMax)
return;
numMessagesToSend.accumulateAndGet((int) n, Math::max);
actorRefill.sendMessage(subscriber);
}
@Override
public void cancel() {
numSent.set(numMax);
}
});
}
WeatherForecastPublisher() {
super(Executors.newFixedThreadPool(2), Flow.defaultBufferSize());
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate( //
// runs submit()
() -> submit(WeatherForecast.nextRandomWeatherForecast()), //
500, 500, TimeUnit.MILLISECONDS);
}
public void onSubscribe(Flow.Subscription subscription) {
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
(this.subscription = subscription).request(bufferSize);
System.out.println("开始onSubscribe订阅");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Flow.Publisher jdkPublisher = FlowAdapters.toFlowPublisher(new NewsServicePublisher(smp ->
Flowable.intervalRange(0, 10, 0, 10, TimeUnit.MILLISECONDS, Schedulers.computation())
.map(e -> NewsLetter.template()
.title(String.valueOf(e))
.digest(Collections.emptyList())
.build())
.subscribe(smp)
));
Publisher external = FlowAdapters.toPublisher(jdkPublisher);
Flow.Publisher jdkPublisher2 = FlowAdapters.toFlowPublisher(
external
);
NewsServiceSubscriber newsServiceSubscriber = new NewsServiceSubscriber(2);
jdkPublisher2.subscribe(FlowAdapters.toFlowSubscriber(newsServiceSubscriber));
while (true) {
Optional<NewsLetter> letterOptional = newsServiceSubscriber.eventuallyReadDigest();
if (letterOptional.isPresent()) {
NewsLetter letter = letterOptional.get();
System.out.println(letter);
if (letter.getTitle().equals("9")) {
break;
}
}
}
}
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
if ((boolean) SUBSCRIBED_HANDLE.getAndSet(this, true)) {
subscriber.onError(new IllegalStateException(
"Publisher " + this + " does not support multiple subscribers"));
return;
}
subscriber.onSubscribe(new IteratorBackedSubscription<>(this.asyncIterator, subscriber));
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
}
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
publisher.subscribe(s);
s.join();
final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
try {
publisher.subscribe(s2);
} catch (final Throwable e) {
Assert.fail("failure should be notified via onError, got: " + e);
}
FlowAdapterTest.unwrap(s2);
}
@Override
public Subscriber<Integer> createFlowSubscriber() {
return new FlowAdapter.SubscribingIterator<Integer>() {
@Override
public void onSubscribe(final Flow.Subscription subscription) {
super.onSubscribe(subscription);
consume();
}
};
}
public void onSubscribe(Flow.Subscription subscription) {
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
(this.subscription = subscription).request(bufferSize);
System.out.println("开始onSubscribe订阅");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new Subscription(subscriber));
} catch (RejectedExecutionException e) {
subscriber.onError(new IllegalStateException(e));
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException();
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
try {
out = FileChannel.open(file, options);
} catch (IOException e) {
result.completeExceptionally(e);
subscription.cancel();
return;
}
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
// We can handle whatever you've got
subscription.request(Long.MAX_VALUE);
}
@Override
public synchronized void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
if (!(subscriber instanceof ProcessorBase)) {
throw new UnsupportedOperationException();
}
ProcessorBase base = (ProcessorBase)subscriber;
HttpClientImpl client = base.getClient();
InputStream is = streamSupplier.get();
if (is == null) {
throw new UncheckedIOException(new IOException("no inputstream supplied"));
}
this.delegate = new PullPublisher<>(() -> new StreamIterator(is));
delegate.subscribe(subscriber);
}
/**
* A default-constructed SubmissionPublisher has no subscribers,
* is not closed, has default buffer size, and uses the
* defaultExecutor
*/
public void testConstructor1() {
SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
checkInitialState(p);
assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
if (ForkJoinPool.getCommonPoolParallelism() > 1)
assertSame(e, c);
else
assertNotSame(e, c);
}
@Override
public void close() throws IOException {
synchronized (this) {
closed = true;
Flow.Subscription s = subscription;
if (s != null) {
s.cancel();
}
subscription = null;
}
super.close();
}
Flow.Subscriber<T> asReactiveSubscriber(int optimalQueueLength, Consumer<Throwable> onErrorHandler, Consumer<PartialActor<T, S>> onCompleteHandler) {
AtomicReference<Flow.Subscription> sub = new AtomicReference<>();
return new Flow.Subscriber<T>() {
private void askRefill() {
int messagesRequired = optimalQueueLength - queue.size();
if (messagesRequired > 0)
sub.get().request(messagesRequired);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (sub.get() != null)
subscription.cancel();
else {
sub.set(subscription);
askRefill();
}
}
@Override
public void onNext(T item) {
Objects.requireNonNull(item);
execAsync(() -> {
actorLogic.accept(item);
askRefill();
});
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
execAsync(() -> {
if (onErrorHandler != null)
onErrorHandler.accept(throwable);
});
sendPoisonPill();
}
@Override
public void onComplete() {
execAsync(() -> {
if (onCompleteHandler != null)
onCompleteHandler.accept(Actor.this);
});
sendPoisonPill();
}
};
}
@Override
public Flow.Subscriber<Integer> createFlowSubscriber() {
return ActorSystem.anonymous().newActor((Integer n) -> {
}).asReactiveSubscriber(100, null, null);
}
@Override
public Flow.Subscriber<Integer> createFlowSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
var realSubscriber = ActorSystem.anonymous().newActor((Integer n) -> {
}).asReactiveSubscriber(100, null, null);
return new Flow.Subscriber<Integer>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
realSubscriber.onSubscribe(subscription);
// register a successful Subscription, and create a Puppet,
// for the WhiteboxVerification to be able to drive its tests:
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(long elements) {
subscription.request(elements);
}
@Override
public void signalCancel() {
subscription.cancel();
}
});
}
@Override
public void onNext(Integer item) {
realSubscriber.onNext(item);
probe.registerOnNext(item);
}
@Override
public void onError(Throwable throwable) {
realSubscriber.onError(throwable);
probe.registerOnError(throwable);
}
@Override
public void onComplete() {
realSubscriber.onComplete();
probe.registerOnComplete();
}
};
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("******调用 onSubscribe******");
subscription.request(3);
this.subscription = subscription;
}
public Flow.Subscription getSubscription() {
return subscription;
}
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
subscriber
.onSubscribe(
new IteratorBackedSubscription<>(this.asyncIteratorSupplier.get(), subscriber));
}
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
this.publisher = Objects.requireNonNull(publisher);
}
IteratorBackedSubscription(final AsyncIterator<T> iterator,
final Flow.Subscriber<? super T> subscriber) {
this.iterator = Objects.requireNonNull(iterator);
this.subscriber = Objects.requireNonNull(subscriber);
}