类java.util.concurrent.Flow源码实例Demo

下面列出了怎么用java.util.concurrent.Flow的API类实例代码及写法,或者点击链接到github查看源代码。

public static void main(String[] args) {
	SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();

	weatherForecastPublisher.subscribe(new DatabaseSubscriber());
	weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());

	Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
	weatherForecastPublisher.subscribe(metricConverter);
	metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());

	// close the publisher and associated resources after 10 seconds
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	weatherForecastPublisher.close();
}
 
源代码2 项目: pgadba   文件: PgRowPublisherOperation.java
@Override
public ParameterizedRowPublisherOperation<R> subscribe(Flow.Subscriber<? super Result.RowColumn> subscriber,
    CompletionStage<? extends R> result) {
  if (result == null) {
    throw new IllegalArgumentException("result is not allowed to be null");
  }
  if (subscriber == null) {
    throw new IllegalArgumentException("subscriber is not allowed to be null");
  }

  publisher.subscribe(subscriber);
  this.result = result;
  result.thenAccept(r -> {
    if (groupSubmission != null) {
      groupSubmission.addGroupResult(r);
    }
    submission.getCompletionStage().toCompletableFuture().complete(r);
  });
  return this;
}
 
源代码3 项目: spring-reactive-sample   文件: PostController.java
@GetMapping
    public Flow.Publisher<Post> all() {
//        Executor proxyExecutor = (Runnable command)-> ForkJoinPool.commonPool().execute(command);
//        SubmissionPublisher publisher  = new SubmissionPublisher(proxyExecutor, Flow.defaultBufferSize());
//        publisher.submit(new Post(1L, "post one", "content of post one"));
//        publisher.submit(new Post(2L, "post two", "content of post two"));
//
//        return publisher;
        // see: https://stackoverflow.com/questions/46597924/spring-5-supports-java-9-flow-apis-in-its-reactive-feature
        return JdkFlowAdapter.publisherToFlowPublisher(
                Flux.just(
                        new Post(1L, "post one", "content of post one"),
                        new Post(2L, "post two", "content of post two")
                )
        );
    }
 
源代码4 项目: Fibry   文件: TestReactiveSubscribers.java
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
    subscriber.onSubscribe(new Flow.Subscription() {
        private final AtomicBoolean completed = new AtomicBoolean(false);
        private final AtomicInteger numMessagesToSend = new AtomicInteger();
        private final Actor<Flow.Subscriber<? super Integer>, Void, Void> actorRefill = ActorSystem.anonymous().newActor(sub -> {
            while (numSent.get() < numMax && numMessagesToSend.get() > 0) {
                subscriber.onNext(numSent.incrementAndGet());
                numMessagesToSend.decrementAndGet();
            }

            if (numSent.get() >= numMax) {
                if (completed.compareAndSet(false, true))
                    subscriber.onComplete();
            }
        });

        @Override
        public void request(long n) {
            if (numSent.get() >= numMax)
                return;

            numMessagesToSend.accumulateAndGet((int) n, Math::max);

            actorRefill.sendMessage(subscriber);
        }

        @Override
        public void cancel() {
            numSent.set(numMax);
        }
    });
}
 
WeatherForecastPublisher() {
	super(Executors.newFixedThreadPool(2), Flow.defaultBufferSize());
	scheduler = new ScheduledThreadPoolExecutor(1);
	periodicTask = scheduler.scheduleAtFixedRate( //
			// runs submit()
			() -> submit(WeatherForecast.nextRandomWeatherForecast()), //
			500, 500, TimeUnit.MILLISECONDS);
}
 
public void onSubscribe(Flow.Subscription subscription) {
    //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
    (this.subscription = subscription).request(bufferSize);
    System.out.println("开始onSubscribe订阅");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
public static void main(String[] args) {
    Flow.Publisher jdkPublisher = FlowAdapters.toFlowPublisher(new NewsServicePublisher(smp ->
        Flowable.intervalRange(0, 10, 0, 10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .map(e -> NewsLetter.template()
                                    .title(String.valueOf(e))
                                    .digest(Collections.emptyList())
                                    .build())
                .subscribe(smp)
    ));
    Publisher external = FlowAdapters.toPublisher(jdkPublisher);
    Flow.Publisher jdkPublisher2 = FlowAdapters.toFlowPublisher(
            external
    );

    NewsServiceSubscriber newsServiceSubscriber = new NewsServiceSubscriber(2);
    jdkPublisher2.subscribe(FlowAdapters.toFlowSubscriber(newsServiceSubscriber));



    while (true) {
        Optional<NewsLetter> letterOptional = newsServiceSubscriber.eventuallyReadDigest();

        if (letterOptional.isPresent()) {
            NewsLetter letter = letterOptional.get();
            System.out.println(letter);

            if (letter.getTitle().equals("9")) {
                break;
            }
        }
    }
}
 
源代码8 项目: java-async-util   文件: FlowAdapter.java
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
  if ((boolean) SUBSCRIBED_HANDLE.getAndSet(this, true)) {
    subscriber.onError(new IllegalStateException(
        "Publisher " + this + " does not support multiple subscribers"));
    return;
  }
  subscriber.onSubscribe(new IteratorBackedSubscription<>(this.asyncIterator, subscriber));
}
 
源代码9 项目: java-async-util   文件: FlowAdapter.java
@Override
public void onSubscribe(final Flow.Subscription subscription) {
  Objects.requireNonNull(subscription);
  if (this.subscription != null) {
    subscription.cancel();
    return;
  }
  this.subscription = subscription;
}
 
源代码10 项目: java-async-util   文件: FlowAdapterTest.java
@Test(expected = IllegalStateException.class)
public void testDoubleSubscription() throws Throwable {
  final Flow.Publisher<Long> publisher = FlowAdapter.toPublisher(AsyncIterator.range(0, 5));
  final ConsumingSubscriber<Long> s = new ConsumingSubscriber<>();
  publisher.subscribe(s);
  s.join();

  final ConsumingSubscriber<Long> s2 = new ConsumingSubscriber<>();
  try {
    publisher.subscribe(s2);
  } catch (final Throwable e) {
    Assert.fail("failure should be notified via onError, got: " + e);
  }
  FlowAdapterTest.unwrap(s2);
}
 
@Override
public Subscriber<Integer> createFlowSubscriber() {
  return new FlowAdapter.SubscribingIterator<Integer>() {
    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
      super.onSubscribe(subscription);
      consume();
    }
  };
}
 
public void onSubscribe(Flow.Subscription subscription) {
    //count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
    (this.subscription = subscription).request(bufferSize);
    System.out.println("开始onSubscribe订阅");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
源代码13 项目: openjdk-jdk9   文件: DefaultPublisher.java
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
    try {
        subscriber.onSubscribe(new Subscription(subscriber));
    } catch (RejectedExecutionException e) {
        subscriber.onError(new IllegalStateException(e));
    }
}
 
源代码14 项目: openjdk-jdk9   文件: Stream.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException();
    }
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码15 项目: openjdk-jdk9   文件: Http1Request.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException("already subscribed");
    }
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码16 项目: openjdk-jdk9   文件: Http1Request.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        throw new IllegalStateException("already subscribed");
    }
    this.subscription = subscription;
    subscription.request(1);
}
 
源代码17 项目: openjdk-jdk9   文件: ResponseProcessors.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    try {
        out = FileChannel.open(file, options);
    } catch (IOException e) {
        result.completeExceptionally(e);
        subscription.cancel();
        return;
    }
    subscription.request(1);
}
 
源代码18 项目: openjdk-jdk9   文件: ResponseProcessors.java
@Override
public void onSubscribe(Flow.Subscription subscription) {
    if (this.subscription != null) {
        subscription.cancel();
        return;
    }
    this.subscription = subscription;
    // We can handle whatever you've got
    subscription.request(Long.MAX_VALUE);
}
 
源代码19 项目: openjdk-jdk9   文件: RequestProcessors.java
@Override
public synchronized void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
    if (!(subscriber instanceof ProcessorBase)) {
        throw new UnsupportedOperationException();
    }
    ProcessorBase base = (ProcessorBase)subscriber;
    HttpClientImpl client = base.getClient();
    InputStream is = streamSupplier.get();
    if (is == null) {
        throw new UncheckedIOException(new IOException("no inputstream supplied"));
    }
    this.delegate = new PullPublisher<>(() -> new StreamIterator(is));
    delegate.subscribe(subscriber);
}
 
源代码20 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * A default-constructed SubmissionPublisher has no subscribers,
 * is not closed, has default buffer size, and uses the
 * defaultExecutor
 */
public void testConstructor1() {
    SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
    checkInitialState(p);
    assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
    Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
    if (ForkJoinPool.getCommonPoolParallelism() > 1)
        assertSame(e, c);
    else
        assertNotSame(e, c);
}
 
源代码21 项目: openjdk-jdk9   文件: HttpInputStreamTest.java
@Override
public void close() throws IOException {
    synchronized (this) {
        closed = true;
        Flow.Subscription s = subscription;
        if (s != null) {
            s.cancel();
        }
        subscription = null;
    }
    super.close();
}
 
源代码22 项目: Fibry   文件: Actor.java
Flow.Subscriber<T> asReactiveSubscriber(int optimalQueueLength, Consumer<Throwable> onErrorHandler, Consumer<PartialActor<T, S>> onCompleteHandler) {
    AtomicReference<Flow.Subscription> sub = new AtomicReference<>();
    return new Flow.Subscriber<T>() {
        private void askRefill() {
            int messagesRequired = optimalQueueLength - queue.size();

            if (messagesRequired > 0)
                sub.get().request(messagesRequired);
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (sub.get() != null)
                subscription.cancel();
            else {
                sub.set(subscription);
                askRefill();
            }
        }

        @Override
        public void onNext(T item) {
            Objects.requireNonNull(item);

            execAsync(() -> {
                actorLogic.accept(item);
                askRefill();
            });
        }

        @Override
        public void onError(Throwable throwable) {
            Objects.requireNonNull(throwable);
            execAsync(() -> {
                if (onErrorHandler != null)
                    onErrorHandler.accept(throwable);
            });

            sendPoisonPill();
        }

        @Override
        public void onComplete() {
            execAsync(() -> {
                if (onCompleteHandler != null)
                    onCompleteHandler.accept(Actor.this);
            });
            sendPoisonPill();
        }
    };
}
 
源代码23 项目: Fibry   文件: TestReactiveSubscribersBlackBox.java
@Override
public Flow.Subscriber<Integer> createFlowSubscriber() {
    return ActorSystem.anonymous().newActor((Integer n) -> {
    }).asReactiveSubscriber(100, null, null);
}
 
源代码24 项目: Fibry   文件: TestReactiveSubscribersWhiteBox.java
@Override
public Flow.Subscriber<Integer> createFlowSubscriber(final WhiteboxSubscriberProbe<Integer> probe) {
    var realSubscriber = ActorSystem.anonymous().newActor((Integer n) -> {
    }).asReactiveSubscriber(100, null, null);

    return new Flow.Subscriber<Integer>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            realSubscriber.onSubscribe(subscription);

            // register a successful Subscription, and create a Puppet,
            // for the WhiteboxVerification to be able to drive its tests:
            probe.registerOnSubscribe(new SubscriberPuppet() {
                @Override
                public void triggerRequest(long elements) {
                    subscription.request(elements);
                }

                @Override
                public void signalCancel() {
                    subscription.cancel();
                }
            });
        }

        @Override
        public void onNext(Integer item) {
            realSubscriber.onNext(item);
            probe.registerOnNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            realSubscriber.onError(throwable);
            probe.registerOnError(throwable);
        }

        @Override
        public void onComplete() {
            realSubscriber.onComplete();
            probe.registerOnComplete();
        }
    };
}
 
@Override
public void onSubscribe(Flow.Subscription subscription) {
	this.subscription = subscription;
	subscription.request(1);
}
 
@Override
public void onSubscribe(Flow.Subscription subscription) {
    log.info("******调用 onSubscribe******");
    subscription.request(3);
    this.subscription = subscription;
}
 
public Flow.Subscription getSubscription() {
    return subscription;
}
 
源代码28 项目: java-async-util   文件: FlowAdapter.java
@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
  subscriber
      .onSubscribe(
          new IteratorBackedSubscription<>(this.asyncIteratorSupplier.get(), subscriber));
}
 
源代码29 项目: java-async-util   文件: FlowAdapter.java
SubscribingIterator(final Flow.Publisher<? extends T> publisher) {
  this.publisher = Objects.requireNonNull(publisher);
}
 
源代码30 项目: java-async-util   文件: FlowAdapter.java
IteratorBackedSubscription(final AsyncIterator<T> iterator,
    final Flow.Subscriber<? super T> subscriber) {
  this.iterator = Objects.requireNonNull(iterator);
  this.subscriber = Objects.requireNonNull(subscriber);
}
 
 类所在包
 同包方法