下面列出了java.util.concurrent.SubmissionPublisher#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void teststockRemoval() throws InterruptedException {
Stock stock = new Stock();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();
p.subscribe(new StockMaintain(stock));
Product product = new Product();
stock.store(product, 40);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("所有订单已经提交完毕");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher已关闭");
}
@Test
public void teststockRemoval() throws InterruptedException {
Stock stock = new Stock();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();
p.subscribe(new StockMaintain(stock));
Product product = new Product();
stock.store(product, 40);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("所有订单已经提交完毕");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher已关闭");
}
/**
* A closed publisher reports isClosed with no closedException and
* throws ISE upon attempted submission; a subsequent close or
* closeExceptionally has no additional effect.
*/
public void testClose() {
SubmissionPublisher<Integer> p = basicPublisher();
checkInitialState(p);
p.close();
assertTrue(p.isClosed());
assertNull(p.getClosedException());
try {
p.submit(1);
shouldThrow();
} catch (IllegalStateException success) {}
Throwable ex = new SPException();
p.closeExceptionally(ex);
assertTrue(p.isClosed());
assertNull(p.getClosedException());
}
/**
* A publisher closedExceptionally reports isClosed with the
* closedException and throws ISE upon attempted submission; a
* subsequent close or closeExceptionally has no additional
* effect.
*/
public void testCloseExceptionally() {
SubmissionPublisher<Integer> p = basicPublisher();
checkInitialState(p);
Throwable ex = new SPException();
p.closeExceptionally(ex);
assertTrue(p.isClosed());
assertSame(p.getClosedException(), ex);
try {
p.submit(1);
shouldThrow();
} catch (IllegalStateException success) {}
p.close();
assertTrue(p.isClosed());
assertSame(p.getClosedException(), ex);
}
/**
* Closing a publisher causes onComplete to subscribers
*/
public void testCloseCompletes() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.submit(1);
p.close();
assertTrue(p.isClosed());
assertNull(p.getClosedException());
s1.awaitComplete();
assertEquals(1, s1.nexts);
assertEquals(1, s1.completes);
s2.awaitComplete();
assertEquals(1, s2.nexts);
assertEquals(1, s2.completes);
}
/**
* Closing a publisher exceptionally causes onError to subscribers
* after they are subscribed
*/
public void testCloseExceptionallyError() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.submit(1);
p.closeExceptionally(new SPException());
assertTrue(p.isClosed());
s1.awaitSubscribe();
s1.awaitError();
assertTrue(s1.nexts <= 1);
assertEquals(1, s1.errors);
s2.awaitSubscribe();
s2.awaitError();
assertTrue(s2.nexts <= 1);
assertEquals(1, s2.errors);
}
/**
* Cancelling a subscription eventually causes no more onNexts to be issued
*/
public void testCancel() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.sn.cancel();
for (int i = 2; i <= 20; ++i)
p.submit(i);
p.close();
s2.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertTrue(s1.nexts < 20);
assertFalse(p.isSubscribed(s1));
}
/**
* Throwing an exception in onNext causes onError
*/
public void testThrowOnNext() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.throwOnCall = true;
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
s1.awaitComplete();
assertEquals(1, s1.errors);
}
/**
* If a handler is supplied in constructor, it is invoked when
* subscriber throws an exception in onNext
*/
public void testThrowOnNextHandler() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
basicExecutor, 8, (s, e) -> calls.getAndIncrement());
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s1.awaitSubscribe();
p.submit(1);
s1.throwOnCall = true;
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitError();
assertEquals(1, s1.errors);
assertEquals(1, calls.get());
}
/**
* onNext items are issued in the same order to each subscriber
*/
public void testOrder() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
for (int i = 1; i <= 20; ++i)
p.submit(i);
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertEquals(20, s1.nexts);
assertEquals(1, s1.completes);
}
/**
* onNext is issued only if requested
*/
public void testRequest1() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
p.subscribe(s1);
s1.awaitSubscribe();
assertTrue(p.estimateMinimumDemand() == 0);
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s2);
p.submit(1);
p.submit(2);
s2.awaitNext(1);
assertEquals(0, s1.nexts);
s1.sn.request(3);
p.submit(3);
p.close();
s2.awaitComplete();
assertEquals(3, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitComplete();
assertTrue(s1.nexts > 0);
assertEquals(1, s1.completes);
}
/**
* onNext is not issued when requests become zero
*/
public void testRequest2() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
s1.request = false;
p.submit(1);
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitNext(1);
assertEquals(1, s1.nexts);
}
/**
* submit returns number of lagged items, compatible with result
* of estimateMaximumLag.
*/
public void testLaggedSubmit() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
assertEquals(1, p.submit(1));
assertTrue(p.estimateMaximumLag() >= 1);
assertTrue(p.submit(2) >= 2);
assertTrue(p.estimateMaximumLag() >= 2);
s1.sn.request(4);
assertTrue(p.submit(3) >= 3);
assertTrue(p.estimateMaximumLag() >= 3);
s2.sn.request(4);
p.submit(4);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
}
@Test
public void testInventoryRemoval() throws InterruptedException {
Inventory inventory = new Inventory();
SubmissionPublisher<Order> p = new SubmissionPublisher<>();//Executors.newFixedThreadPool(6), 20);
p.subscribe(new InventoryKeeper(inventory));
Product product = new Product();
inventory.store(product, 20);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);
for (int i = 0; i < 10; i++)
p.submit(order);
log.info("All orders were submitted");
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher was closed");
}
/**
* Non-positive request causes error
*/
public void testRequest3() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
TestSubscriber s3 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.subscribe(s3);
s3.awaitSubscribe();
s2.awaitSubscribe();
s1.awaitSubscribe();
s1.sn.request(-1L);
s3.sn.request(0L);
p.submit(1);
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitError();
assertEquals(1, s1.errors);
assertTrue(s1.lastError instanceof IllegalArgumentException);
s3.awaitError();
assertEquals(1, s3.errors);
assertTrue(s3.lastError instanceof IllegalArgumentException);
}
/**
* estimateMinimumDemand reports 0 until request, nonzero after
* request, and zero again after delivery
*/
public void testEstimateMinimumDemand() {
TestSubscriber s = new TestSubscriber();
SubmissionPublisher<Integer> p = basicPublisher();
s.request = false;
p.subscribe(s);
s.awaitSubscribe();
assertEquals(0, p.estimateMinimumDemand());
s.sn.request(1);
assertEquals(1, p.estimateMinimumDemand());
p.submit(1);
s.awaitNext(1);
assertEquals(0, p.estimateMinimumDemand());
}
/**
* submit(null) throws NPE
*/
public void testNullSubmit() {
SubmissionPublisher<Integer> p = basicPublisher();
try {
p.submit(null);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* consume returns a CompletableFuture that is done when
* publisher completes
*/
public void testConsume() {
AtomicInteger sum = new AtomicInteger();
SubmissionPublisher<Integer> p = basicPublisher();
CompletableFuture<Void> f =
p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
int n = 20;
for (int i = 1; i <= n; ++i)
p.submit(i);
p.close();
f.join();
assertEquals((n * (n + 1)) / 2, sum.get());
}
/**
* consume eventually stops processing published items if cancelled
*/
public void testCancelledConsume() {
AtomicInteger count = new AtomicInteger();
SubmissionPublisher<Integer> p = basicPublisher();
CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
f.cancel(true);
int n = 1000000; // arbitrary limit
for (int i = 1; i <= n; ++i)
p.submit(i);
assertTrue(count.get() < n);
}