下面列出了java.util.concurrent.SubmissionPublisher#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();
weatherForecastPublisher.subscribe(new DatabaseSubscriber());
weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());
Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
weatherForecastPublisher.subscribe(metricConverter);
metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());
// close the publisher and associated resources after 10 seconds
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
weatherForecastPublisher.close();
}
/**
* offer returns number of lagged items if not saturated
*/
public void testLaggedOffer() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
assertTrue(p.offer(1, null) >= 1);
assertTrue(p.offer(2, null) >= 2);
s1.sn.request(4);
assertTrue(p.offer(3, null) >= 3);
s2.sn.request(4);
p.offer(4, null);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
}
@Test
public void givenPublisher_whenRequestForOnlyOneElement_thenShouldConsumeOnlyThatOne() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expected)
);
}
/**
* Timed offer returns number of lagged items if not saturated
*/
public void testLaggedTimedOffer() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
long startTime = System.nanoTime();
assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
s1.sn.request(4);
assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
s2.sn.request(4);
p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
p.close();
s2.awaitComplete();
assertEquals(4, s2.nexts);
s1.awaitComplete();
assertEquals(4, s2.nexts);
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
}
/**
* offer reports drops if saturated
*/
public void testDroppedOffer() {
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
for (int i = 1; i <= 4; ++i)
assertTrue(p.offer(i, null) >= 0);
p.offer(5, null);
assertTrue(p.offer(6, null) < 0);
s1.sn.request(64);
assertTrue(p.offer(7, null) < 0);
s2.sn.request(64);
p.close();
s2.awaitComplete();
assertTrue(s2.nexts >= 4);
s1.awaitComplete();
assertTrue(s1.nexts >= 4);
}
/**
* submit eventually issues requested items when buffer capacity is 1
*/
public void testCap1Submit() {
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 1);
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
for (int i = 1; i <= 20; ++i) {
assertTrue(p.estimateMinimumDemand() <= 1);
assertTrue(p.submit(i) >= 0);
}
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(20, s2.nexts);
assertEquals(1, s2.completes);
assertEquals(20, s1.nexts);
assertEquals(1, s1.completes);
}
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(6);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
() -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(items)
);
}
/**
* onNext is issued only if requested
*/
public void testRequest1() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
p.subscribe(s1);
s1.awaitSubscribe();
assertTrue(p.estimateMinimumDemand() == 0);
TestSubscriber s2 = new TestSubscriber();
p.subscribe(s2);
p.submit(1);
p.submit(2);
s2.awaitNext(1);
assertEquals(0, s1.nexts);
s1.sn.request(3);
p.submit(3);
p.close();
s2.awaitComplete();
assertEquals(3, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitComplete();
assertTrue(s1.nexts > 0);
assertEquals(1, s1.completes);
}
public static void main(String[] args) {
// Try this:
// SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new OnDemandWeatherForecastPublisher();
// Try this:
// SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new SubmissionPublisher<>();
SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();
weatherForecastPublisher.subscribe(new TwitterSubscriber());
weatherForecastPublisher.subscribe(new DatabaseSubscriber());
// Try this in combination with `weatherForecastPublisher = new SubmissionPiblisher<WeatherForecast>();`
// for (int i = 0; i < Long.MAX_VALUE; i++) {
// weatherForecastPublisher.submit(WeatherForecast.nextRandomWeatherForecast());
// }
// close the publisher and associated resources after 10 seconds
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Comment out when using OnDemandWeatherForecastPublisher which is not AutoClosable
weatherForecastPublisher.close();
}
/**
* Non-positive request causes error
*/
public void testRequest3() {
SubmissionPublisher<Integer> p = basicPublisher();
TestSubscriber s1 = new TestSubscriber();
TestSubscriber s2 = new TestSubscriber();
TestSubscriber s3 = new TestSubscriber();
p.subscribe(s1);
p.subscribe(s2);
p.subscribe(s3);
s3.awaitSubscribe();
s2.awaitSubscribe();
s1.awaitSubscribe();
s1.sn.request(-1L);
s3.sn.request(0L);
p.submit(1);
p.submit(2);
p.close();
s2.awaitComplete();
assertEquals(2, s2.nexts);
assertEquals(1, s2.completes);
s1.awaitError();
assertEquals(1, s1.errors);
assertTrue(s1.lastError instanceof IllegalArgumentException);
s3.awaitError();
assertEquals(1, s3.errors);
assertTrue(s3.lastError instanceof IllegalArgumentException);
}
private static void subscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService,
Process process, double timeSec, DateLocation dateLocation, double[] speedLimitByLane){
Processor<Integer> subscriber =
new Processor<>(process, timeSec, dateLocation, speedLimitByLane);
Subscription subscription = new Subscription(subscriber, execService);
subscriber.onSubscribe(subscription);
publisher.subscribe(subscriber);
}
/**
* estimateMinimumDemand reports 0 until request, nonzero after
* request, and zero again after delivery
*/
public void testEstimateMinimumDemand() {
TestSubscriber s = new TestSubscriber();
SubmissionPublisher<Integer> p = basicPublisher();
s.request = false;
p.subscribe(s);
s.awaitSubscribe();
assertEquals(0, p.estimateMinimumDemand());
s.sn.request(1);
assertEquals(1, p.estimateMinimumDemand());
p.submit(1);
s.awaitNext(1);
assertEquals(0, p.estimateMinimumDemand());
}
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Long> publisher = new SubmissionPublisher();
int count = 5;
WelcomeSubscriber subscriber = new WelcomeSubscriber(count);
publisher.subscribe(subscriber);
LongStream.range(10, 20).forEach(publisher::submit);
Thread.sleep(1000);
}
/**
* If closed, upon subscription, the subscriber's onComplete
* method is invoked
*/
public void testSubscribe2() {
TestSubscriber s = new TestSubscriber();
SubmissionPublisher<Integer> p = basicPublisher();
p.close();
p.subscribe(s);
s.awaitComplete();
assertEquals(0, s.nexts);
assertEquals(0, s.errors);
assertEquals(1, s.completes, 1);
}
public static void main(String[] args) throws InterruptedException {
// Create End Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Create Processor
MyProcessor transformProcessor = new MyProcessor(s -> {
return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});
//Create End Subscriber
MyFreelancerSubscriber subs = new MyFreelancerSubscriber();
//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// Closing publishers
publisher.close();
transformProcessor.close();
System.out.println("Exiting the app");
}
/**
* An exception thrown in onSubscribe causes onError
*/
public void testSubscribe5() {
TestSubscriber s = new TestSubscriber();
SubmissionPublisher<Integer> p = basicPublisher();
s.throwOnCall = true;
try {
p.subscribe(s);
} catch (Exception ok) {}
s.awaitError();
assertEquals(0, s.nexts);
assertEquals(1, s.errors);
assertEquals(0, s.completes);
}
/**
* Timed offer invokes drop handler if saturated
*/
public void testHandledDroppedTimedOffer() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
long delay = timeoutMillis();
for (int i = 1; i <= 4; ++i)
assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
long startTime = System.nanoTime();
assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
s1.sn.request(64);
assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
assertTrue(millisElapsedSince(startTime) >= delay);
s2.sn.request(64);
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertTrue(calls.get() >= 2);
}
/**
* Timed offer succeeds if drop handler forces request
*/
public void testRecoveredHandledDroppedTimedOffer() {
AtomicInteger calls = new AtomicInteger();
SubmissionPublisher<Integer> p
= new SubmissionPublisher<>(basicExecutor, 4);
TestSubscriber s1 = new TestSubscriber();
s1.request = false;
TestSubscriber s2 = new TestSubscriber();
s2.request = false;
p.subscribe(s1);
p.subscribe(s2);
s2.awaitSubscribe();
s1.awaitSubscribe();
int n = 0;
long delay = timeoutMillis();
long startTime = System.nanoTime();
for (int i = 1; i <= 6; ++i) {
int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
n = n + 2 + (d < 0 ? d : 0);
}
assertTrue(millisElapsedSince(startTime) >= delay);
p.close();
s2.awaitComplete();
s1.awaitComplete();
assertEquals(n, s1.nexts + s2.nexts);
assertTrue(calls.get() >= 2);
}
private static void demoSubscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService, String subscriberName){
DemoSubscriber<Integer> subscriber = new DemoSubscriber<>(subscriberName);
DemoSubscription subscription = new DemoSubscription(subscriber, execService);
subscriber.onSubscribe(subscription);
publisher.subscribe(subscriber);
}
public static void main(String[] args) throws InterruptedException {
/*
// Sample without transformation
// Create Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Register Subscriber
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// logic to wait till processing of all messages are over
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// close the Publisher
publisher.close();
System.out.println("Exiting the app");
*/
// Sample with transformation
// Create End Publisher
SubmissionPublisher<Employee> publisher
= new SubmissionPublisher<>();
// Create Processor
MyProcessor transformProcessor
= new MyProcessor(s -> {
return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});
//Create End Subscriber
MyFreelancerSubscriber subs
= new MyFreelancerSubscriber();
//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber
List<Employee> emps = EmpHelper.getEmps();
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));
// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
Thread.sleep(10);
}
// Closing publishers
publisher.close();
transformProcessor.close();
System.out.println("Exiting the app");
}