下面列出了怎么用java.util.concurrent.SubmissionPublisher的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();
weatherForecastPublisher.subscribe(new DatabaseSubscriber());
weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());
Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
weatherForecastPublisher.subscribe(metricConverter);
metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());
// close the publisher and associated resources after 10 seconds
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
weatherForecastPublisher.close();
}
@Test
public void teststockRemoval() throws InterruptedException {
Stock stock = new Stock();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();
p.subscribe(new StockMaintain(stock));
Product product = new Product();
stock.store(product, 40);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("所有订单已经提交完毕");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher已关闭");
}
public static void dispatch(int trafficUnitsNumber, double timeSec, DateLocation dateLocation, double[] speedLimitByLane) {
ExecutorService execService = ForkJoinPool.commonPool();
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
subscribe(publisher, execService, Process.AVERAGE_SPEED, timeSec, dateLocation, speedLimitByLane);
subscribe(publisher, execService, Process.TRAFFIC_DENSITY, timeSec, dateLocation, speedLimitByLane);
publisher.submit(trafficUnitsNumber);
} finally {
try {
execService.shutdown();
execService.awaitTermination(1, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
} finally {
execService.shutdownNow();
}
}
}
private static void demo4_Flow_submissionPublisher() {
System.out.println();
ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){
demoSubscribe(publisher, execService, "One");
demoSubscribe(publisher, execService, "Two");
demoSubscribe(publisher, execService, "Three");
IntStream.range(1, 5).forEach(publisher::submit);
} finally {
try {
execService.shutdown();
int shutdownDelaySec = 1;
System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service...");
execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
} finally {
System.out.println("Calling execService.shutdownNow()...");
List<Runnable> l = execService.shutdownNow();
System.out.println(l.size() + " tasks were waiting to be executed. Service stopped.");
}
}
}
public static void dispatch(int trafficUnitsNumber, double timeSec, DateLocation dateLocation, double[] speedLimitByLane) {
ExecutorService execService = ForkJoinPool.commonPool();
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
subscribe(publisher, execService, Process.AVERAGE_SPEED, timeSec, dateLocation, speedLimitByLane);
subscribe(publisher, execService, Process.TRAFFIC_DENSITY, timeSec, dateLocation, speedLimitByLane);
publisher.submit(trafficUnitsNumber);
} finally {
try {
execService.shutdown();
execService.awaitTermination(1, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
} finally {
execService.shutdownNow();
}
}
}
private static void demo4_Flow_submissionPublisher() {
System.out.println();
ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){
demoSubscribe(publisher, execService, "One");
demoSubscribe(publisher, execService, "Two");
demoSubscribe(publisher, execService, "Three");
IntStream.range(1, 5).forEach(publisher::submit);
} finally {
try {
execService.shutdown();
int shutdownDelaySec = 1;
System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service...");
execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
} finally {
System.out.println("Calling execService.shutdownNow()...");
List<Runnable> l = execService.shutdownNow();
System.out.println(l.size() + " tasks were waiting to be executed. Service stopped.");
}
}
}
@Test
public void teststockRemoval() throws InterruptedException {
Stock stock = new Stock();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();
p.subscribe(new StockMaintain(stock));
Product product = new Product();
stock.store(product, 40);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("所有订单已经提交完毕");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher已关闭");
}
@FXML
protected void handleSubmitReactStreamsButtonAction(ActionEvent event) {
Window owner = submitButton.getScene().getWindow();
doValidationGUI(owner);
objectRegisteredUsersCount++;
System.out.println("\n Registered User react- " + nameField.getText() + ", " + emailField.getText() + ", " + passwordField.getText());
try (SubmissionPublisher<User> usersPublisher = new SubmissionPublisher<User>()) {
User u = new User(objectRegisteredUsersCount, nameField.getText(), emailField.getText(), passwordField.getText());
UsersSubscriberReactStream usersSubscriber = new UsersSubscriberReactStream();
usersPublisher.subscribe(usersSubscriber);
usersPublisher.submit(u);
usersSubscriber.cancelSubscription();
} catch(Exception reacte) {
reacte.printStackTrace();
}
AlertHelper.showAlert(Alert.AlertType.CONFIRMATION, owner, "Registration Successful react!",
"Welcome " + nameField.getText());
}
/**
* A closed publisher reports isClosed with no closedException and
* throws ISE upon attempted submission; a subsequent close or
* closeExceptionally has no additional effect.
*/
public void testClose() {
SubmissionPublisher<Integer> p = basicPublisher();
checkInitialState(p);
p.close();
assertTrue(p.isClosed());
assertNull(p.getClosedException());
try {
p.submit(1);
shouldThrow();
} catch (IllegalStateException success) {}
Throwable ex = new SPException();
p.closeExceptionally(ex);
assertTrue(p.isClosed());
assertNull(p.getClosedException());
}
/**
* A publisher closedExceptionally reports isClosed with the
* closedException and throws ISE upon attempted submission; a
* subsequent close or closeExceptionally has no additional
* effect.
*/
public void testCloseExceptionally() {
SubmissionPublisher<Integer> p = basicPublisher();
checkInitialState(p);
Throwable ex = new SPException();
p.closeExceptionally(ex);
assertTrue(p.isClosed());
assertSame(p.getClosedException(), ex);
try {
p.submit(1);
shouldThrow();
} catch (IllegalStateException success) {}
p.close();
assertTrue(p.isClosed());
assertSame(p.getClosedException(), ex);
}
/**
* Upon attempted resubscription, the subscriber's onError is
* called and the subscription is cancelled.
*/
public void testSubscribe4() {
TestSubscriber s = new TestSubscriber();
SubmissionPublisher<Integer> p = basicPublisher();
p.subscribe(s);
assertTrue(p.hasSubscribers());
assertEquals(1, p.getNumberOfSubscribers());
assertTrue(p.getSubscribers().contains(s));
assertTrue(p.isSubscribed(s));
s.awaitSubscribe();
assertNotNull(s.sn);
assertEquals(0, s.nexts);
assertEquals(0, s.errors);
assertEquals(0, s.completes);
p.subscribe(s);
s.awaitError();
assertEquals(0, s.nexts);
assertEquals(1, s.errors);
assertFalse(p.isSubscribed(s));
}
/**
* Closing a publisher causes onComplete to subscribers
*/
public void testCloseCompletes() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.submit(1);
p.close();
assertTrue(p.isClosed());
assertNull(p.getClosedException());
s1.awaitComplete();
assertEquals(1, s1.nexts);
assertEquals(1, s1.completes);
s2.awaitComplete();
assertEquals(1, s2.nexts);
assertEquals(1, s2.completes);
}
/**
* Closing a publisher exceptionally causes onError to subscribers
* after they are subscribed
*/
public void testCloseExceptionallyError() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.submit(1);
p.closeExceptionally(new SPException());
assertTrue(p.isClosed());
s1.awaitSubscribe();
s1.awaitError();
assertTrue(s1.nexts <= 1);
assertEquals(1, s1.errors);
s2.awaitSubscribe();
s2.awaitError();
assertTrue(s2.nexts <= 1);
assertEquals(1, s2.errors);
}
/**
* Cancelling a subscription eventually causes no more onNexts to be issued
*/
public void testCancel() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.sn.cancel();
for (int i = 2; i <= 20; ++i)
p.submit(i);
p.close();
s2.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertTrue(s1.nexts < 20);
assertFalse(p.isSubscribed(s1));
}
/**
* Throwing an exception in onNext causes onError
*/
public void testThrowOnNext() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.throwOnCall = true;
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
s1.awaitComplete();
assertEquals(1, s1.errors);
}
/**
* If a handler is supplied in constructor, it is invoked when
* subscriber throws an exception in onNext
*/
public void testThrowOnNextHandler() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
basicExecutor, 8, (s, e) -> calls.getAndIncrement());
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.throwOnCall = true;
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitError();
assertEquals(1, s1.errors);
assertEquals(1, calls.get());
}
/**
* onNext items are issued in the same order to each subscriber
*/
public void testOrder() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
for (int i = 1; i <= 20; ++i)
p.submit(i);
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertEquals(20, s1.nexts);
assertEquals(1, s1.completes);
}
/**
* onNext is issued only if requested
*/
public void testRequest1() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
p.subscribe(s1);
s1.awaitSubscribe();
assertTrue(p.estimateMinimumDemand() == 0);
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s2);
p.submit(1);
p.submit(2);
s2.awaitNext(1);
assertEquals(0, s1.nexts);
s1.sn.request(3);
p.submit(3);
p.close();
s2.awaitComplete();
assertEquals(3, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitComplete();
assertTrue(s1.nexts > 0);
assertEquals(1, s1.completes);
}
/**
* onNext is not issued when requests become zero
*/
public void testRequest2() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
s1.request = false;
p.submit(1);
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitNext(1);
assertEquals(1, s1.nexts);
}
/**
* submit returns number of lagged items, compatible with result
* of estimateMaximumLag.
*/
public void testLaggedSubmit() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
assertEquals(1, p.submit(1));
assertTrue(p.estimateMaximumLag() >= 1);
assertTrue(p.submit(2) >= 2);
assertTrue(p.estimateMaximumLag() >= 2);
s1.sn.request(4);
assertTrue(p.submit(3) >= 3);
assertTrue(p.estimateMaximumLag() >= 3);
s2.sn.request(4);
p.submit(4);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
}
/**
* submit eventually issues requested items when buffer capacity is 1
*/
public void testCap1Submit() {
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 1);
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
for (int i = 1; i <= 20; ++i) {
assertTrue(p.estimateMinimumDemand() <= 1);
assertTrue(p.submit(i) >= 0);
}
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertEquals(20, s1.nexts);
assertEquals(1, s1.completes);
}
/**
* offer returns number of lagged items if not saturated
*/
public void testLaggedOffer() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
assertTrue(p.offer(1, null) >= 1);
assertTrue(p.offer(2, null) >= 2);
s1.sn.request(4);
assertTrue(p.offer(3, null) >= 3);
s2.sn.request(4);
p.offer(4, null);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
}
/**
* offer reports drops if saturated
*/
public void testDroppedOffer() {
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
for (int i = 1; i <= 4; ++i)
assertTrue(p.offer(i, null) >= 0);
p.offer(5, null);
assertTrue(p.offer(6, null) < 0);
s1.sn.request(64);
assertTrue(p.offer(7, null) < 0);
s2.sn.request(64);
p.close();
s2.awaitComplete();
assertTrue(s2.nexts >= 4);
s1.awaitComplete();
assertTrue(s1.nexts >= 4);
}
/**
* offer invokes drop handler if saturated
*/
public void testHandledDroppedOffer() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
for (int i = 1; i <= 4; ++i)
assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
p.offer(4, (s, x) -> noopHandle(calls));
assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
s1.sn.request(64);
assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
s2.sn.request(64);
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertTrue(calls.get() >= 4);
}
/**
* offer succeeds if drop handler forces request
*/
public void testRecoveredHandledDroppedOffer() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
int n = 0;
for (int i = 1; i <= 8; ++i) {
int d = p.offer(i, (s, x) -> reqHandle(calls, s));
n = n + 2 + (d < 0 ? d : 0);
}
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(n, s1.nexts + s2.nexts);
assertTrue(calls.get() >= 2);
}
/**
* Timed offer returns number of lagged items if not saturated
*/
public void testLaggedTimedOffer() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
long startTime = System.nanoTime();
assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
s1.sn.request(4);
assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
s2.sn.request(4);
p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
}
@Test
public void testInventoryRemoval() throws InterruptedException {
Inventory inventory = new Inventory();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();//Executors.newFixedThreadPool(6), 20);
p.subscribe(new InventoryKeeper(inventory));
Product product = new Product();
inventory.store(product, 20);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("All orders were submitted");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher was closed");
}
public static void main(String args[]) throws InterruptedException {
// Create Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Register Subscriber
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// logic to wait till processing of all messages are over
//while (emps.size() != subs.getCounter() || !publisher.isSubscribed(subs)) {
Thread.sleep(1000);
//}
// close the Publisher
publisher.close();
System.out.println("Exiting the app");
}
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(6);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(items)
);
}
@Test
public void givenPublisher_whenSubscribeAndTransformElements_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor = new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>(3);
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
//when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expectedResult)
);
}