类java.util.concurrent.SubmissionPublisher源码实例Demo

下面列出了怎么用java.util.concurrent.SubmissionPublisher的API类实例代码及写法,或者点击链接到github查看源代码。

public static void main(String[] args) {
	SubmissionPublisher<WeatherForecast> weatherForecastPublisher = new WeatherForecastPublisher();

	weatherForecastPublisher.subscribe(new DatabaseSubscriber());
	weatherForecastPublisher.subscribe(new TwitterSubscriber<WeatherForecast>());

	Flow.Processor<WeatherForecast, MetricWeatherForecast> metricConverter = new UsToMetricProcessor();
	weatherForecastPublisher.subscribe(metricConverter);
	metricConverter.subscribe(new TwitterSubscriber<MetricWeatherForecast>());

	// close the publisher and associated resources after 10 seconds
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	weatherForecastPublisher.close();
}
 
@Test
public void teststockRemoval() throws InterruptedException {
    Stock stock = new Stock();
    SubmissionPublisher<Order> p = new SubmissionPublisher<>();
    p.subscribe(new StockMaintain(stock));
    Product product = new Product();
    stock.store(product, 40);
    OrderItem item = new OrderItem();
    item.setProduct(product);
    item.setAmount(10);
    Order order = new Order();
    List<OrderItem> items = new LinkedList<>();
    items.add(item);
    order.setItems(items);
    for (int i = 0; i < 10; i++)
        p.submit(order);
    log.info("所有订单已经提交完毕");
    for (int j = 0; j < 10; j++) {
        log.info("Sleeping a bit...");
        Thread.sleep(50);
    }
    p.close();
    log.info("Publisher已关闭");
}
 
public static void dispatch(int trafficUnitsNumber, double timeSec, DateLocation dateLocation, double[] speedLimitByLane) {

        ExecutorService execService =  ForkJoinPool.commonPool();
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
            subscribe(publisher, execService, Process.AVERAGE_SPEED, timeSec, dateLocation, speedLimitByLane);
            subscribe(publisher, execService, Process.TRAFFIC_DENSITY, timeSec, dateLocation, speedLimitByLane);
            publisher.submit(trafficUnitsNumber);
        } finally {
            try {
                execService.shutdown();
                execService.awaitTermination(1, TimeUnit.SECONDS);
            } catch (Exception ex) {
                System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
            } finally {
                execService.shutdownNow();
            }
        }
    }
 
private static void demo4_Flow_submissionPublisher() {
    System.out.println();

    ExecutorService execService =  ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
    try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){
        demoSubscribe(publisher, execService, "One");
        demoSubscribe(publisher, execService, "Two");
        demoSubscribe(publisher, execService, "Three");
        IntStream.range(1, 5).forEach(publisher::submit);
    } finally {
        try {
            execService.shutdown();
            int shutdownDelaySec = 1;
            System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service...");
            execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
        } finally {
            System.out.println("Calling execService.shutdownNow()...");
            List<Runnable> l = execService.shutdownNow();
            System.out.println(l.size() + " tasks were waiting to be executed. Service stopped.");
        }

    }

}
 
源代码5 项目: Java-9-Cookbook   文件: Dispatcher.java
public static void dispatch(int trafficUnitsNumber, double timeSec, DateLocation dateLocation, double[] speedLimitByLane) {

        ExecutorService execService =  ForkJoinPool.commonPool();
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
            subscribe(publisher, execService, Process.AVERAGE_SPEED, timeSec, dateLocation, speedLimitByLane);
            subscribe(publisher, execService, Process.TRAFFIC_DENSITY, timeSec, dateLocation, speedLimitByLane);
            publisher.submit(trafficUnitsNumber);
        } finally {
            try {
                execService.shutdown();
                execService.awaitTermination(1, TimeUnit.SECONDS);
            } catch (Exception ex) {
                System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
            } finally {
                execService.shutdownNow();
            }
        }
    }
 
源代码6 项目: Java-9-Cookbook   文件: Chapter07Concurrency04.java
private static void demo4_Flow_submissionPublisher() {
    System.out.println();

    ExecutorService execService =  ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
    try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){//execService, 1)){
        demoSubscribe(publisher, execService, "One");
        demoSubscribe(publisher, execService, "Two");
        demoSubscribe(publisher, execService, "Three");
        IntStream.range(1, 5).forEach(publisher::submit);
    } finally {
        try {
            execService.shutdown();
            int shutdownDelaySec = 1;
            System.out.println("Waiting for " + shutdownDelaySec + " sec before shutting down service...");
            execService.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("Caught around execService.awaitTermination(): " + ex.getClass().getName());
        } finally {
            System.out.println("Calling execService.shutdownNow()...");
            List<Runnable> l = execService.shutdownNow();
            System.out.println(l.size() + " tasks were waiting to be executed. Service stopped.");
        }

    }

}
 
源代码7 项目: Java-9-Spring-Webflux   文件: TestStockMaintain.java
@Test
public void teststockRemoval() throws InterruptedException {
    Stock stock = new Stock();
    SubmissionPublisher<Order> p = new SubmissionPublisher<>();
    p.subscribe(new StockMaintain(stock));
    Product product = new Product();
    stock.store(product, 40);
    OrderItem item = new OrderItem();
    item.setProduct(product);
    item.setAmount(10);
    Order order = new Order();
    List<OrderItem> items = new LinkedList<>();
    items.add(item);
    order.setItems(items);
    for (int i = 0; i < 10; i++)
        p.submit(order);
    log.info("所有订单已经提交完毕");
    for (int j = 0; j < 10; j++) {
        log.info("Sleeping a bit...");
        Thread.sleep(50);
    }
    p.close();
    log.info("Publisher已关闭");
}
 
源代码8 项目: javase   文件: RegistrationFormController.java
@FXML
protected void handleSubmitReactStreamsButtonAction(ActionEvent event) {
	Window owner = submitButton.getScene().getWindow();
	doValidationGUI(owner);

	objectRegisteredUsersCount++;
	
	System.out.println("\n Registered User react- " + nameField.getText() + ", " + emailField.getText() + ", " + passwordField.getText());
	
	try (SubmissionPublisher<User> usersPublisher = new SubmissionPublisher<User>()) {
		User u = new User(objectRegisteredUsersCount, nameField.getText(), emailField.getText(), passwordField.getText());
		UsersSubscriberReactStream usersSubscriber = new UsersSubscriberReactStream();
		usersPublisher.subscribe(usersSubscriber);
		usersPublisher.submit(u);
		usersSubscriber.cancelSubscription();
	} catch(Exception reacte) {
		reacte.printStackTrace();
	}
	
    AlertHelper.showAlert(Alert.AlertType.CONFIRMATION, owner, "Registration Successful react!", 
            "Welcome " + nameField.getText());
}
 
源代码9 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * A closed publisher reports isClosed with no closedException and
 * throws ISE upon attempted submission; a subsequent close or
 * closeExceptionally has no additional effect.
 */
public void testClose() {
    SubmissionPublisher<Integer> p = basicPublisher();
    checkInitialState(p);
    p.close();
    assertTrue(p.isClosed());
    assertNull(p.getClosedException());
    try {
        p.submit(1);
        shouldThrow();
    } catch (IllegalStateException success) {}
    Throwable ex = new SPException();
    p.closeExceptionally(ex);
    assertTrue(p.isClosed());
    assertNull(p.getClosedException());
}
 
源代码10 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * A publisher closedExceptionally reports isClosed with the
 * closedException and throws ISE upon attempted submission; a
 * subsequent close or closeExceptionally has no additional
 * effect.
 */
public void testCloseExceptionally() {
    SubmissionPublisher<Integer> p = basicPublisher();
    checkInitialState(p);
    Throwable ex = new SPException();
    p.closeExceptionally(ex);
    assertTrue(p.isClosed());
    assertSame(p.getClosedException(), ex);
    try {
        p.submit(1);
        shouldThrow();
    } catch (IllegalStateException success) {}
    p.close();
    assertTrue(p.isClosed());
    assertSame(p.getClosedException(), ex);
}
 
源代码11 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Upon attempted resubscription, the subscriber's onError is
 * called and the subscription is cancelled.
 */
public void testSubscribe4() {
    TestSubscriber s = new TestSubscriber();
    SubmissionPublisher<Integer> p = basicPublisher();
    p.subscribe(s);
    assertTrue(p.hasSubscribers());
    assertEquals(1, p.getNumberOfSubscribers());
    assertTrue(p.getSubscribers().contains(s));
    assertTrue(p.isSubscribed(s));
    s.awaitSubscribe();
    assertNotNull(s.sn);
    assertEquals(0, s.nexts);
    assertEquals(0, s.errors);
    assertEquals(0, s.completes);
    p.subscribe(s);
    s.awaitError();
    assertEquals(0, s.nexts);
    assertEquals(1, s.errors);
    assertFalse(p.isSubscribed(s));
}
 
源代码12 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Closing a publisher causes onComplete to subscribers
 */
public void testCloseCompletes() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    p.submit(1);
    p.close();
    assertTrue(p.isClosed());
    assertNull(p.getClosedException());
    s1.awaitComplete();
    assertEquals(1, s1.nexts);
    assertEquals(1, s1.completes);
    s2.awaitComplete();
    assertEquals(1, s2.nexts);
    assertEquals(1, s2.completes);
}
 
源代码13 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Closing a publisher exceptionally causes onError to subscribers
 * after they are subscribed
 */
public void testCloseExceptionallyError() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    p.submit(1);
    p.closeExceptionally(new SPException());
    assertTrue(p.isClosed());
    s1.awaitSubscribe();
    s1.awaitError();
    assertTrue(s1.nexts <= 1);
    assertEquals(1, s1.errors);
    s2.awaitSubscribe();
    s2.awaitError();
    assertTrue(s2.nexts <= 1);
    assertEquals(1, s2.errors);
}
 
源代码14 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Cancelling a subscription eventually causes no more onNexts to be issued
 */
public void testCancel() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    s1.awaitSubscribe();
    p.submit(1);
    s1.sn.cancel();
    for (int i = 2; i <= 20; ++i)
        p.submit(i);
    p.close();
    s2.awaitComplete();
    assertEquals(20, s2.nexts);
    assertEquals(1, s2.completes);
    assertTrue(s1.nexts < 20);
    assertFalse(p.isSubscribed(s1));
}
 
源代码15 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Throwing an exception in onNext causes onError
 */
public void testThrowOnNext() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    s1.awaitSubscribe();
    p.submit(1);
    s1.throwOnCall = true;
    p.submit(2);
    p.close();
    s2.awaitComplete();
    assertEquals(2, s2.nexts);
    s1.awaitComplete();
    assertEquals(1, s1.errors);
}
 
源代码16 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * If a handler is supplied in constructor, it is invoked when
 * subscriber throws an exception in onNext
 */
public void testThrowOnNextHandler() {
    AtomicInteger calls = new AtomicInteger();
    SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
        basicExecutor, 8, (s, e) -> calls.getAndIncrement());
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    s1.awaitSubscribe();
    p.submit(1);
    s1.throwOnCall = true;
    p.submit(2);
    p.close();
    s2.awaitComplete();
    assertEquals(2, s2.nexts);
    assertEquals(1, s2.completes);
    s1.awaitError();
    assertEquals(1, s1.errors);
    assertEquals(1, calls.get());
}
 
源代码17 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * onNext items are issued in the same order to each subscriber
 */
public void testOrder() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    for (int i = 1; i <= 20; ++i)
        p.submit(i);
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertEquals(20, s2.nexts);
    assertEquals(1, s2.completes);
    assertEquals(20, s1.nexts);
    assertEquals(1, s1.completes);
}
 
源代码18 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * onNext is issued only if requested
 */
public void testRequest1() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    p.subscribe(s1);
    s1.awaitSubscribe();
    assertTrue(p.estimateMinimumDemand() == 0);
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s2);
    p.submit(1);
    p.submit(2);
    s2.awaitNext(1);
    assertEquals(0, s1.nexts);
    s1.sn.request(3);
    p.submit(3);
    p.close();
    s2.awaitComplete();
    assertEquals(3, s2.nexts);
    assertEquals(1, s2.completes);
    s1.awaitComplete();
    assertTrue(s1.nexts > 0);
    assertEquals(1, s1.completes);
}
 
源代码19 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * onNext is not issued when requests become zero
 */
public void testRequest2() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    s1.request = false;
    p.submit(1);
    p.submit(2);
    p.close();
    s2.awaitComplete();
    assertEquals(2, s2.nexts);
    assertEquals(1, s2.completes);
    s1.awaitNext(1);
    assertEquals(1, s1.nexts);
}
 
源代码20 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * submit returns number of lagged items, compatible with result
 * of estimateMaximumLag.
 */
public void testLaggedSubmit() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    assertEquals(1, p.submit(1));
    assertTrue(p.estimateMaximumLag() >= 1);
    assertTrue(p.submit(2) >= 2);
    assertTrue(p.estimateMaximumLag() >= 2);
    s1.sn.request(4);
    assertTrue(p.submit(3) >= 3);
    assertTrue(p.estimateMaximumLag() >= 3);
    s2.sn.request(4);
    p.submit(4);
    p.close();
    s2.awaitComplete();
    assertEquals(4, s2.nexts);
    s1.awaitComplete();
    assertEquals(4, s2.nexts);
}
 
源代码21 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * submit eventually issues requested items when buffer capacity is 1
 */
public void testCap1Submit() {
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 1);
    TestSubscriber s1 = new TestSubscriber();
    TestSubscriber s2 = new TestSubscriber();
    p.subscribe(s1);
    p.subscribe(s2);
    for (int i = 1; i <= 20; ++i) {
        assertTrue(p.estimateMinimumDemand() <= 1);
        assertTrue(p.submit(i) >= 0);
    }
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertEquals(20, s2.nexts);
    assertEquals(1, s2.completes);
    assertEquals(20, s1.nexts);
    assertEquals(1, s1.completes);
}
 
源代码22 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer returns number of lagged items if not saturated
 */
public void testLaggedOffer() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    assertTrue(p.offer(1, null) >= 1);
    assertTrue(p.offer(2, null) >= 2);
    s1.sn.request(4);
    assertTrue(p.offer(3, null) >= 3);
    s2.sn.request(4);
    p.offer(4, null);
    p.close();
    s2.awaitComplete();
    assertEquals(4, s2.nexts);
    s1.awaitComplete();
    assertEquals(4, s2.nexts);
}
 
源代码23 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer reports drops if saturated
 */
public void testDroppedOffer() {
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    for (int i = 1; i <= 4; ++i)
        assertTrue(p.offer(i, null) >= 0);
    p.offer(5, null);
    assertTrue(p.offer(6, null) < 0);
    s1.sn.request(64);
    assertTrue(p.offer(7, null) < 0);
    s2.sn.request(64);
    p.close();
    s2.awaitComplete();
    assertTrue(s2.nexts >= 4);
    s1.awaitComplete();
    assertTrue(s1.nexts >= 4);
}
 
源代码24 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer invokes drop handler if saturated
 */
public void testHandledDroppedOffer() {
    AtomicInteger calls = new AtomicInteger();
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    for (int i = 1; i <= 4; ++i)
        assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
    p.offer(4, (s, x) -> noopHandle(calls));
    assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
    s1.sn.request(64);
    assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
    s2.sn.request(64);
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertTrue(calls.get() >= 4);
}
 
源代码25 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * offer succeeds if drop handler forces request
 */
public void testRecoveredHandledDroppedOffer() {
    AtomicInteger calls = new AtomicInteger();
    SubmissionPublisher<Integer> p
        = new SubmissionPublisher<>(basicExecutor, 4);
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    int n = 0;
    for (int i = 1; i <= 8; ++i) {
        int d = p.offer(i, (s, x) -> reqHandle(calls, s));
        n = n + 2 + (d < 0 ? d : 0);
    }
    p.close();
    s2.awaitComplete();
    s1.awaitComplete();
    assertEquals(n, s1.nexts + s2.nexts);
    assertTrue(calls.get() >= 2);
}
 
源代码26 项目: openjdk-jdk9   文件: SubmissionPublisherTest.java
/**
 * Timed offer returns number of lagged items if not saturated
 */
public void testLaggedTimedOffer() {
    SubmissionPublisher<Integer> p = basicPublisher();
    TestSubscriber s1 = new TestSubscriber();
    s1.request = false;
    TestSubscriber s2 = new TestSubscriber();
    s2.request = false;
    p.subscribe(s1);
    p.subscribe(s2);
    s2.awaitSubscribe();
    s1.awaitSubscribe();
    long startTime = System.nanoTime();
    assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
    assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
    s1.sn.request(4);
    assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
    s2.sn.request(4);
    p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
    p.close();
    s2.awaitComplete();
    assertEquals(4, s2.nexts);
    s1.awaitComplete();
    assertEquals(4, s2.nexts);
    assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
}
 
@Test
public void testInventoryRemoval() throws InterruptedException {
    Inventory inventory = new Inventory();
    SubmissionPublisher<Order> p = new SubmissionPublisher<>();//Executors.newFixedThreadPool(6), 20);
    p.subscribe(new InventoryKeeper(inventory));
    Product product = new Product();
    inventory.store(product, 20);
    OrderItem item = new OrderItem();
    item.setProduct(product);
    item.setAmount(10);
    Order order = new Order();
    List<OrderItem> items = new LinkedList<>();
    items.add(item);
    order.setItems(items);
    for (int i = 0; i < 10; i++)
        p.submit(order);
    log.info("All orders were submitted");
    for (int j = 0; j < 10; j++) {
        log.info("Sleeping a bit...");
        Thread.sleep(50);
    }
    p.close();
    log.info("Publisher was closed");
}
 
源代码28 项目: journaldev   文件: MyReactiveApp.java
public static void main(String args[]) throws InterruptedException {

		// Create Publisher
		SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

		// Register Subscriber
		MySubscriber subs = new MySubscriber();
		publisher.subscribe(subs);

		List<Employee> emps = EmpHelper.getEmps();

		// Publish items
		System.out.println("Publishing Items to Subscriber");
		emps.stream().forEach(i -> publisher.submit(i));

		// logic to wait till processing of all messages are over
		//while (emps.size() != subs.getCounter()  || !publisher.isSubscribed(subs)) {
			Thread.sleep(1000);
		//}
		// close the Publisher
		publisher.close();

		System.out.println("Exiting the app");

	}
 
源代码29 项目: tutorials   文件: ReactiveStreamsUnitTest.java
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
    //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(6);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    //when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    //then

    await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
            () -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(items)
    );
}
 
源代码30 项目: tutorials   文件: ReactiveStreamsUnitTest.java
@Test
public void givenPublisher_whenSubscribeAndTransformElements_thenShouldConsumeAllElements() throws InterruptedException {
    //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>(3);
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

    //when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

    //then
    await().atMost(1000, TimeUnit.MILLISECONDS).untilAsserted(
            () -> assertThat(subscriber.consumedElements).containsExactlyElementsOf(expectedResult)
    );
}
 
 类所在包
 同包方法