java.util.concurrent.SubmissionPublisher#subscribe ( )源码实例Demo

下面列出了java.util.concurrent.SubmissionPublisher#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public static void main(String[] args) {
	SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();

	weatherForecastPublisher.subscribe(new DatabaseSubscriber());
	weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());

	Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
	weatherForecastPublisher.subscribe(metricConverter);
	metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());

	// close the publisher and associated resources after 10 seconds
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	weatherForecastPublisher.close();
}
 
源代码2 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer returns number of lagged items if not saturated
 */
public void testLaggedOffer() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    assertTrue(p.offer(1, null) >= 1);
    assertTrue(p.offer(2, null) >= 2);
    s1.sn.request(4);
    assertTrue(p.offer(3, null) >= 3);
    s2.sn.request(4);
    p.offer(4, null);
    p.close();
    s2.awaitComplete();
    assertEquals(4, s2.nexts);
    s1.awaitComplete();
    assertEquals(4, s2.nexts);
}
 
源代码3 项目: tutorials   文件: ReactiveStreamsUnitTest.java
@Test
public void givenPublisher_whenRequestForOnlyOneElement_thenShouldConsumeOnlyThatOne() throws InterruptedException {
    //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

    //when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    //then
    await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
            () -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expected)
    );
}
 
源代码4 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Timed offer returns number of lagged items if not saturated
 */
public void testLaggedTimedOffer() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    long startTime = System.nanoTime();
    assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
    assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
    s1.sn.request(4);
    assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
    s2.sn.request(4);
    p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
    p.close();
    s2.awaitComplete();
    assertEquals(4, s2.nexts);
    s1.awaitComplete();
    assertEquals(4, s2.nexts);
    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
}
 
源代码5 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer reports drops if saturated
 */
public void testDroppedOffer() {
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    for (int i = 1; i <= 4; ++i)
        assertTrue(p.offer(i, null) >= 0);
    p.offer(5, null);
    assertTrue(p.offer(6, null) < 0);
    s1.sn.request(64);
    assertTrue(p.offer(7, null) < 0);
    s2.sn.request(64);
    p.close();
    s2.awaitComplete();
    assertTrue(s2.nexts >= 4);
    s1.awaitComplete();
    assertTrue(s1.nexts >= 4);
}
 
源代码6 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * submit eventually issues requested items when buffer capacity is 1
 */
public void testCap1Submit() {
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 1);
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    for (int i = 1; i <= 20; ++i) {
        assertTrue(p.estimateMinimumDemand() <= 1);
        assertTrue(p.submit(i) >= 0);
    }
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertEquals(20, s2.nexts);
    assertEquals(1, s2.completes);
    assertEquals(20, s1.nexts);
    assertEquals(1, s1.completes);
}
 
源代码7 项目: tutorials   文件: ReactiveStreamsUnitTest.java
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
    //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(6);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    //when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    //then

    await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
            () -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(items)
    );
}
 
源代码8 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * onNext is issued only if requested
 */
public void testRequest1() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    p.subscribe(s1);
    s1.awaitSubscribe();
    assertTrue(p.estimateMinimumDemand() == 0);
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s2);
    p.submit(1);
    p.submit(2);
    s2.awaitNext(1);
    assertEquals(0, s1.nexts);
    s1.sn.request(3);
    p.submit(3);
    p.close();
    s2.awaitComplete();
    assertEquals(3, s2.nexts);
    assertEquals(1, s2.completes);
    s1.awaitComplete();
    assertTrue(s1.nexts > 0);
    assertEquals(1, s1.completes);
}
 
public static void main(String[] args) {
		
		// Try this:
		// SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new OnDemandWeatherForecastPublisher();
		
		// Try this:
		// SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new SubmissionPublisher<>();

		SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();

		weatherForecastPublisher.subscribe(new TwitterSubscriber());
		weatherForecastPublisher.subscribe(new DatabaseSubscriber());

		// Try this in combination with `weatherForecastPublisher = new SubmissionPiblisher<WeatherForecast>();`
//		for (int i = 0; i < Long.MAX_VALUE; i++) {
//			weatherForecastPublisher.submit(WeatherForecast.nextRandomWeatherForecast());
//		}

		// close the publisher and associated resources after 10 seconds
		try {
			TimeUnit.SECONDS.sleep(10);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// Comment out when using OnDemandWeatherForecastPublisher which is not AutoClosable
		weatherForecastPublisher.close();
	}
 
源代码10 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Non-positive request causes error
 */
public void testRequest3() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    TestSubscriber s3 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    p.subscribe(s3);
    s3.awaitSubscribe();
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    s1.sn.request(-1L);
    s3.sn.request(0L);
    p.submit(1);
    p.submit(2);
    p.close();
    s2.awaitComplete();
    assertEquals(2, s2.nexts);
    assertEquals(1, s2.completes);
    s1.awaitError();
    assertEquals(1, s1.errors);
    assertTrue(s1.lastError instanceof IllegalArgumentException);
    s3.awaitError();
    assertEquals(1, s3.errors);
    assertTrue(s3.lastError instanceof IllegalArgumentException);
}
 
源代码11 项目: Java-9-Cookbook   文件: Dispatcher.java
private static void subscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService,
                              Process process, double timeSec, DateLocation dateLocation, double[] speedLimitByLane){
    Processor<Integer> subscriber =
            new Processor<>(process, timeSec, dateLocation, speedLimitByLane);
    Subscription subscription = new Subscription(subscriber, execService);
    subscriber.onSubscribe(subscription);
    publisher.subscribe(subscriber);
}
 
源代码12 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * estimateMinimumDemand reports 0 until request, nonzero after
 * request, and zero again after delivery
 */
public void testEstimateMinimumDemand() {
    TestSubscriber s = new TestSubscriber();
    SubmissionPublisher<Integer> p = basicPublisher();
    s.request = false;
    p.subscribe(s);
    s.awaitSubscribe();
    assertEquals(0, p.estimateMinimumDemand());
    s.sn.request(1);
    assertEquals(1, p.estimateMinimumDemand());
    p.submit(1);
    s.awaitNext(1);
    assertEquals(0, p.estimateMinimumDemand());
}
 
public static void main(String[] args) throws InterruptedException {

		SubmissionPublisher<Long> publisher = new SubmissionPublisher();

		int count = 5;
		WelcomeSubscriber subscriber = new WelcomeSubscriber(count);
		publisher.subscribe(subscriber);
		LongStream.range(10, 20).forEach(publisher::submit);
		Thread.sleep(1000);

	}
 
源代码14 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * If closed, upon subscription, the subscriber's onComplete
 * method is invoked
 */
public void testSubscribe2() {
    TestSubscriber s = new TestSubscriber();
    SubmissionPublisher<Integer> p = basicPublisher();
    p.close();
    p.subscribe(s);
    s.awaitComplete();
    assertEquals(0, s.nexts);
    assertEquals(0, s.errors);
    assertEquals(1, s.completes, 1);
}
 
源代码15 项目: journaldev   文件: MyReactiveAppWithProcessor.java
public static void main(String[] args) throws InterruptedException {
	// Create End Publisher
	SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

	// Create Processor
	MyProcessor transformProcessor = new MyProcessor(s -> {
		return new Freelancer(s.getId(), s.getId() + 100, s.getName());
	});

	//Create End Subscriber
	MyFreelancerSubscriber subs = new MyFreelancerSubscriber();

	//Create chain of publisher, processor and subscriber
	publisher.subscribe(transformProcessor); // publisher to processor
	transformProcessor.subscribe(subs); // processor to subscriber

	List<Employee> emps = EmpHelper.getEmps();

	// Publish items
	System.out.println("Publishing Items to Subscriber");
	emps.stream().forEach(i -> publisher.submit(i));

	// Logic to wait for messages processing to finish
	while (emps.size() != subs.getCounter()) {
		Thread.sleep(10);
	}

	// Closing publishers
	publisher.close();
	transformProcessor.close();

	System.out.println("Exiting the app");
}
 
源代码16 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * An exception thrown in onSubscribe causes onError
 */
public void testSubscribe5() {
    TestSubscriber s = new TestSubscriber();
    SubmissionPublisher<Integer> p = basicPublisher();
    s.throwOnCall = true;
    try {
        p.subscribe(s);
    } catch (Exception ok) {}
    s.awaitError();
    assertEquals(0, s.nexts);
    assertEquals(1, s.errors);
    assertEquals(0, s.completes);
}
 
源代码17 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Timed offer invokes drop handler if saturated
 */
public void testHandledDroppedTimedOffer() {
    AtomicInteger calls = new AtomicInteger();
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    long delay = timeoutMillis();
    for (int i = 1; i <= 4; ++i)
        assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
    long startTime = System.nanoTime();
    assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
    s1.sn.request(64);
    assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
    assertTrue(millisElapsedSince(startTime) >= delay);
    s2.sn.request(64);
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertTrue(calls.get() >= 2);
}
 
源代码18 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Timed offer succeeds if drop handler forces request
 */
public void testRecoveredHandledDroppedTimedOffer() {
    AtomicInteger calls = new AtomicInteger();
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    int n = 0;
    long delay = timeoutMillis();
    long startTime = System.nanoTime();
    for (int i = 1; i <= 6; ++i) {
        int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
        n = n + 2 + (d < 0 ? d : 0);
    }
    assertTrue(millisElapsedSince(startTime) >= delay);
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertEquals(n, s1.nexts + s2.nexts);
    assertTrue(calls.get() >= 2);
}
 
private static void demoSubscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService, String subscriberName){
    DemoSubscriber<Integer> subscriber = new DemoSubscriber<>(subscriberName);
    DemoSubscription subscription = new DemoSubscription(subscriber, execService);
    subscriber.onSubscribe(subscription);
    publisher.subscribe(subscriber);
}
 
public static void main(String[] args) throws InterruptedException {
/*  	
// Sample without transformation
// Create Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

// Register Subscriber
MySubscriber subs = new MySubscriber();
publisher.subscribe(subs);

List<Employee> emps = EmpHelper.getEmps();

// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));

// logic to wait till processing of all messages are over
while (emps.size() != subs.getCounter()) {
	Thread.sleep(10);
}
// close the Publisher
publisher.close();
System.out.println("Exiting the app");
*/

// Sample with transformation
// Create End Publisher
SubmissionPublisher<Employee> publisher 
	= new SubmissionPublisher<>();

// Create Processor
MyProcessor transformProcessor 
	= new MyProcessor(s -> {
	return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});

//Create End Subscriber
MyFreelancerSubscriber subs 
	= new MyFreelancerSubscriber();

//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
transformProcessor.subscribe(subs); // processor to subscriber

List<Employee> emps = EmpHelper.getEmps();

// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> publisher.submit(i));

// Logic to wait for messages processing to finish
while (emps.size() != subs.getCounter()) {
	Thread.sleep(10);
}

// Closing publishers
publisher.close();
transformProcessor.close();

System.out.println("Exiting the app");
 }