类javax.ws.rs.sse.SseEventSource源码实例Demo

下面列出了怎么用javax.ws.rs.sse.SseEventSource的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: quarkus-quickstarts   文件: PriceResourceTest.java
@Test
public void shouldGetStreamOfPrices() {
    Client client = ClientBuilder.newClient();
    WebTarget target = client.target(pricesUrl);

    AtomicInteger priceCount = new AtomicInteger();

    try (SseEventSource source = SseEventSource.target(target).build()) {
        source.register(event -> {
            Double value = event.readData(Double.class);
            System.out.println("Received price: " + value);
            priceCount.incrementAndGet();
        });
        source.open();
        Thread.sleep(15 * 1000L);
    } catch (InterruptedException ignored) {
    }

    int count = priceCount.get();
    assertTrue(count > 1, "Expected more than 2 prices read from the source, got " + count);
}
 
源代码2 项目: quarkus   文件: MutinyTest.java
@Test
public void testSSE() {
    Client client = ClientBuilder.newClient();
    WebTarget target = client.target("http://localhost:" + RestAssured.port + "/mutiny/pets");
    SseEventSource source = SseEventSource.target(target).build();
    List<Pet> pets = new CopyOnWriteArrayList<>();
    try (SseEventSource eventSource = source) {
        eventSource.register(event -> {
            Pet pet = event.readData(Pet.class, MediaType.APPLICATION_JSON_TYPE);
            pets.add(pet);
        }, ex -> {
            throw new IllegalStateException("SSE failure", ex);
        });
        eventSource.open();
        await().until(() -> pets.size() == 5);
    }
    Assertions.assertEquals("neo", pets.get(0).getName());
    Assertions.assertEquals("indy", pets.get(1).getName());
    Assertions.assertEquals("plume", pets.get(2).getName());
    Assertions.assertEquals("titi", pets.get(3).getName());
    Assertions.assertEquals("rex", pets.get(4).getName());
}
 
源代码3 项目: quarkus   文件: SseTestCase.java
@Test
public void testSse() throws Exception {

    Client client = ClientBuilder.newBuilder().build();
    WebTarget target = client.target(uri.toString() + "sse");
    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        CompletableFuture<String> res = new CompletableFuture<>();
        eventSource.register(new Consumer<InboundSseEvent>() {
            @Override
            public void accept(InboundSseEvent inboundSseEvent) {
                res.complete(inboundSseEvent.readData());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                res.completeExceptionally(throwable);
            }
        });
        eventSource.open();
        Assertions.assertEquals("hello", res.get(5, TimeUnit.SECONDS));
    }
}
 
源代码4 项目: javaee8-cookbook   文件: ClientConsumer.java
private static void consume() {

        try (final SseEventSource sseSource =
                     SseEventSource
                             .target(WEB_TARGET)
                             .build()) {

            sseSource.register(System.out::println);
            sseSource.open();

            for (int counter=0; counter < 5; counter++) {
                System.out.println(" ");
                for (int innerCounter=0; innerCounter < 5; innerCounter++) {
                    WEB_TARGET.request().post(Entity.json("event " + innerCounter));
                }
                Thread.sleep(1000);
            }
            
            CLIENT.close();
            System.out.println("\nAll messages consumed");
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
 
源代码5 项目: Java-EE-8-Sampler   文件: SseClient.java
private static void consumeEventsViaSubscription() {
    try (final SseEventSource eventSource =
                 SseEventSource.target(target)
                         .build()) {

        eventSource.register(System.out::println);
        eventSource.open();

        for (int counter = 0; counter < 5; counter++) {
            target.request().post(Entity.text("message " + counter));
        }

        Thread.sleep(1000); // make sure all the events have time to arrive
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
源代码6 项目: redpipe   文件: ApiTest.java
private void checkHelloObservableSSE(String prefix, TestContext context) {
	CountDownLatch latch = new CountDownLatch(2);
	Client client = ClientBuilder.newClient();
	WebTarget target = client.target("http://localhost:9000"+prefix+"/hello-observable-sse");
	SseEventSource msgEventSource = SseEventSource.target(target).build();
	try (SseEventSource eventSource = msgEventSource){
		eventSource.register(event -> {
			if(latch.getCount() == 2)
				context.assertEquals("one", event.readData(String.class));
			else
				context.assertEquals("two", event.readData(String.class));
			latch.countDown();
		}, ex -> {
			context.fail(ex);
		});
		eventSource.open();
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
 
源代码7 项目: ee8-sandbox   文件: SseClient.java
public final static void main(String[] args) {

        WebTarget target = ClientBuilder.newClient().target("http://localhost:8080/jaxrs-sse/rest/events");

        try (SseEventSource eventSource = SseEventSource.target(target).build()) {

            // EventSource#register(Consumer<InboundSseEvent>)
            // Registered event handler will print the received message.
            eventSource.register(System.out::println);

            // Subscribe to the event stream.
            eventSource.open();
            
            // Consume events for just 500 ms
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
源代码8 项目: openwebbeans-meecrowave   文件: SSETest.java
@Test
public void sse() throws MalformedURLException, InterruptedException {
    final Client client = ClientBuilder.newBuilder().build();
    WebTarget base = client.target(String.format("http://localhost:%d", CONTAINER.getConfiguration().getHttpPort()));
    //the /META-INF/services/javax.ws.rs.sse.SseEventSource.Builder file is only needed until this is fixed:
    //https://issues.apache.org/jira/browse/CXF-7633
    //An exception is not thrown on a 404 response but that is not a Meecrowave issue.
    try (final SseEventSource eventSource = SseEventSource.target(base.path("/rs/news/update")).build()) {
        CountDownLatch cdl = new CountDownLatch(5);
        eventSource.register(sse -> {
            JsonObject data = sse.readData(JsonObject.class, MediaType.APPLICATION_JSON_TYPE);
            assertNotNull(data);
            cdl.countDown();
        }, e -> {
            e.printStackTrace();
            fail(e.getMessage());

        });
        eventSource.open();
        assertTrue(cdl.await(20, TimeUnit.SECONDS));
        assertTrue(eventSource.close(5, TimeUnit.SECONDS));
    }
    client.close();
}
 
源代码9 项目: mangooio   文件: ServerSentEventServiceTest.java
@Test
public void testCloseConnection() throws InterruptedException {
	//given
	ServerSentEventService ServerSentEventService = Application.getInstance(ServerSentEventService.class);
	Config config = Application.getInstance(Config.class);
	Client client = ClientBuilder.newClient();

	//when
	WebTarget webTarget = client.target("http://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/sse");
	SseEventSource sseEventSource = SseEventSource.target(webTarget).build();
	sseEventSource.register((sseEvent) -> {eventData = sseEvent.readData();}, (e) -> e.printStackTrace());
	sseEventSource.open();
	ServerSentEventService.close("/sse");
	sseEventSource.close();
	client.close();

	//then
	assertThat(ServerSentEventService.getConnections("/sse"), not(nullValue()));
	assertThat(ServerSentEventService.getConnections("/sse").size(), equalTo(0));
}
 
源代码10 项目: mangooio   文件: ServerSentEventServiceTest.java
@Test
public void testSendData() throws InterruptedException {
	//given
	Config config = Application.getInstance(Config.class);
	Client client = ClientBuilder.newClient();
	String data = UUID.randomUUID().toString();
	
	//when
	WebTarget webTarget = client.target("http://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/sse");
	SseEventSource sseEventSource = SseEventSource.target(webTarget).build();
	sseEventSource.register((sseEvent) -> {eventData = sseEvent.readData();}, (e) -> e.printStackTrace());
	sseEventSource.open();
       
       //then
       Application.getInstance(ServerSentEventService.class).send("/sse", data);
       await().atMost(2,  TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, equalTo(data)));
       sseEventSource.close();
       client.close();
}
 
源代码11 项目: cxf   文件: SpringSseEmitterTest.java
@Test
public void testSseEvents() throws InterruptedException {
    final WebTarget target = createWebTarget();
    final Collection<Book> books = new ArrayList<>();

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collect(books), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        awaitEvents(5000, books, 5);
    }

    assertThat(books,
        hasItems(
            new Book("New Book #1", "Author #1"),
            new Book("New Book #2", "Author #2"),
            new Book("New Book #3", "Author #3"),
            new Book("New Book #4", "Author #4"),
            new Book("New Book #5", "Author #5")
        )
    );
}
 
源代码12 项目: cxf   文件: AbstractSseTest.java
@Test
public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException {
    final WebTarget target = createWebTarget("/rest/api/bookstore/sse/" + UUID.randomUUID())
        .property(HttpHeaders.LAST_EVENT_ID_HEADER, 150);
    final Collection<Book> books = new ArrayList<>();

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collect(books), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        awaitEvents(5000, books, 4);
    }

    // Easing the test verification here, it does not work well for Atm + Jetty
    assertThat(books,
        hasItems(
            new Book("New Book #151", 151),
            new Book("New Book #152", 152),
            new Book("New Book #153", 153),
            new Book("New Book #154", 154)
        )
    );
}
 
源代码13 项目: cxf   文件: AbstractSseTest.java
@Test
public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
    final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
    final Collection<Book> books = new ArrayList<>();

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collect(books), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        awaitEvents(5000, books, 4);
    }
    // Easing the test verification here, it does not work well for Atm + Jetty
    assertThat(books,
        hasItems(
            new Book("New Book #1", 1),
            new Book("New Book #2", 2),
            new Book("New Book #3", 3),
            new Book("New Book #4", 4)
        )
    );
}
 
源代码14 项目: cxf   文件: AbstractSseTest.java
@Test
public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
    final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
    final Collection<String> titles = new ArrayList<>();

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collectRaw(titles), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        awaitEvents(5000, titles, 4);
    }
    // Easing the test verification here, it does not work well for Atm + Jetty
    assertThat(titles,
        hasItems(
            "New Book #1",
            "New Book #2",
            "New Book #3",
            "New Book #4"
        )
    );
}
 
源代码15 项目: cxf   文件: AbstractSseTest.java
@Test
public void testBooksSseContainerResponseFilterIsCalled() throws InterruptedException {
    final WebTarget target = createWebTarget("/rest/api/bookstore/filtered/sse");
    final Collection<Book> books = new ArrayList<>();

    assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
        .request()
        .get(Integer.class), equalTo(0));

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collect(books), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        Thread.sleep(1000);
    }
    // Easing the test verification here, it does not work well for Atm + Jetty
    assertTrue(books.isEmpty());

    assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
        .request()
        .get(Integer.class), equalTo(1));
}
 
源代码16 项目: cxf   文件: AbstractSseTest.java
@Test
public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
    final WebTarget target = createWebTarget("/rest/api/bookstore/nodelay/sse/0");
    final Collection<Book> books = new ArrayList<>();

    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(collect(books), System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        awaitEvents(5000, books, 5);
    }
    // Easing the test verification here, it does not work well for Atm + Jetty
    assertThat(books,
        hasItems(
            new Book("New Book #1", 1),
            new Book("New Book #2", 2),
            new Book("New Book #3", 3),
            new Book("New Book #4", 4),
            new Book("New Book #5", 5)
        )
    );
}
 
源代码17 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testNoReconnectAndOneEventReceived() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withNoReconnect(Type.EVENT)) {
        eventSource.open();

        assertThat(eventSource.isOpen(), equalTo(true));

        // Allow the event processor to pull for events (150ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(1));
    assertThat(events.get(0).getId(), equalTo("1"));
    assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
    assertThat(events.get(0).getComment(), equalTo("test comment"));
    assertThat(events.get(0).readData(), equalTo("test data"));
}
 
源代码18 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testNoReconnectAndMixedEventsAreReceived() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withNoReconnect(Type.EVENT_MIXED)) {
        eventSource.open();

        assertThat(eventSource.isOpen(), equalTo(true));

        // Allow the event processor to pull for events (150ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(2));
    assertThat(events.get(0).getName(), nullValue());
    assertThat(events.get(0).readData(), equalTo("just test data"));
    assertThat(events.get(1).getId(), equalTo("1"));
    assertThat(events.get(1).getReconnectDelay(), equalTo(10000L));
    assertThat(events.get(1).getComment(), equalTo("test comment"));
    assertThat(events.get(1).readData(), equalTo("test data"));
}
 
源代码19 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withReconnect(Type.EVENT_NO_RETRY)) {
        eventSource.open();

        assertThat(eventSource.isOpen(), equalTo(true));

        // Allow the event processor to pull for events (200ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(2));
    assertThat(events.get(0).getId(), equalTo("1"));
    assertThat(events.get(0).getComment(), equalTo("test comment"));
    assertThat(events.get(0).readData(), equalTo("test data"));
    assertThat(events.get(1).getId(), equalTo("1"));
    assertThat(events.get(1).getComment(), equalTo("test comment"));
    assertThat(events.get(1).readData(), equalTo("test data"));
}
 
源代码20 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withNoReconnect(Type.EVENT_BAD_RETRY)) {
        eventSource.open();

        assertThat(eventSource.isOpen(), equalTo(true));

        // Allow the event processor to pull for events (150ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(1));
    assertThat(events.get(0).getId(), equalTo("1"));
    assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
    assertThat(events.get(0).getComment(), equalTo("test comment"));
    assertThat(events.get(0).readData(), equalTo("test data"));
}
 
源代码21 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testConnectWithLastEventId() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withNoReconnect(Type.EVENT_LAST_EVENT_ID, "10")) {
        eventSource.open();
        assertThat(eventSource.isOpen(), equalTo(true));

        // Allow the event processor to pull for events (150ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(1));
    assertThat(events.get(0).getId(), equalTo("10"));
    assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
    assertThat(events.get(0).getComment(), equalTo("test comment"));
    assertThat(events.get(0).readData(), equalTo("test data"));
}
 
源代码22 项目: cxf   文件: SseEventSourceImplTest.java
@Test
public void testReconnectWithLastEventId() throws InterruptedException, IOException {
    try (SseEventSource eventSource = withReconnect(Type.EVENT_RETRY_LAST_EVENT_ID, "10")) {
        eventSource.open();
        assertThat(eventSource.isOpen(), equalTo(false));
        assertThat(errors.size(), equalTo(1));

        // Allow the event processor to pull for events (150ms)
        Thread.sleep(150L);
    }

    assertThat(events.size(), equalTo(1));
    assertThat(events.get(0).getId(), equalTo("10"));
    assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
    assertThat(events.get(0).getComment(), equalTo("test comment"));
    assertThat(events.get(0).readData(), equalTo("test data"));
}
 
源代码23 项目: hammock   文件: SseTest.java
@Test
public void shouldBeAbleToRetrieveRestEndpoint() throws Exception {
    WebTarget target = ClientBuilder.newClient().register(JohnzonExtension.class).target(uri+"/sse/{uuid}").resolveTemplate("uuid", UUID.randomUUID().toString());
    List<SseModel> receivedModels = new ArrayList<>();
    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(event -> {
            SseModel body = event.readData(SseModel.class, MediaType.APPLICATION_JSON_TYPE);
            System.out.println("Received "+body.getName());
            receivedModels.add(body);
        }, System.out::println);
        eventSource.open();
        // Give the SSE stream some time to collect all events
        Thread.sleep(1000);
    }
    assertFalse(receivedModels.isEmpty());
}
 
源代码24 项目: hammock   文件: CXFSseTest.java
@Test
public void testFireSseEventsAsync() throws InterruptedException {
    final WebTarget target = createWebTarget("/sse/" + UUID.randomUUID());
    final Collection<String> messages = new ArrayList<>();
    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        eventSource.register(e -> {
            System.out.println("New event...");
            messages.add(e.readData());
        }, System.out::println);
        eventSource.open();
        // wait for messages to come in

        if(messages.size() <= 4) {
            Thread.sleep(1000);
        }
    }

    messages.forEach(System.out::println);
    assertThat(messages).hasSize(4);
}
 
源代码25 项目: tutorials   文件: SseClientApp.java
public static void main(String... args) throws Exception {

        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(url);
        try (SseEventSource eventSource = SseEventSource.target(target).build()) {

            eventSource.register(onEvent, onError, onComplete);
            eventSource.open();

            //Consuming events for one hour
            Thread.sleep(60 * 60 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        client.close();
        System.out.println("End");
    }
 
源代码26 项目: tutorials   文件: SseClientBroadcastApp.java
public static void main(String... args) throws Exception {

        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(subscribeUrl);
        try (final SseEventSource eventSource = SseEventSource.target(target)
                .reconnectingEvery(5, TimeUnit.SECONDS)
                .build()) {
            eventSource.register(onEvent, onError, onComplete);
            eventSource.open();
            System.out.println("Wainting for incoming event ...");

            //Consuming events for one hour
            Thread.sleep(60 * 60 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        client.close();
        System.out.println("End");
    }
 
源代码27 项目: jbang   文件: smee.java
@Override
public Integer call() throws Exception { // your business logic goes here...

    if(target==null) {
        target = String.format("http://127.0.0.1:%s%s", port, path);
    }

    if(url==null) {
        url = createChannel();
    }

    URI uri = new URI(url);

    Client client = ClientBuilder.newBuilder()
                                 .build();
                                 //.register(HTTPLoggingFilter.class);

    final var events = SseEventSource.target(client.target(uri))
                                      // Reconnect immediately
                                     .reconnectingEvery(0, TimeUnit.MILLISECONDS).build();

    events.register(this::onMessage, this::onError);

    log.info("Forwarding " + url + " to " + target);
    events.open();

    if(events.isOpen()) {
        log.info("Connected " + url);
    }

    Thread.currentThread().join();

    return 0;
}
 
源代码28 项目: quarkus-quickstarts   文件: PriceTest.java
@Test
public void test() {
    // Check we don't have any prices
    List<Price> prices = RestAssured.get("/prices/all").as(new TypeRef<List<Price>>() {});
    Assertions.assertTrue(prices.isEmpty());

    // Stream the prices
    Client client = ClientBuilder.newClient();
    WebTarget target = client.target(PRICES_SSE_ENDPOINT);
    List<Double> received = new CopyOnWriteArrayList<>();
    SseEventSource source = SseEventSource.target(target).build();
    source.register(inboundSseEvent -> received.add(Double.valueOf(inboundSseEvent.readData())));
    source.open();

    // Send the prices
    Price p1 = new Price();
    p1.value = 1.0;
    Price p2 = new Price();
    p2.value = 4.0;
    Price p3 = new Price();
    p3.value = 2.0;
    RestAssured.given().header("Content-Type", "application/json").body(p1).post("/").then().statusCode(204);
    RestAssured.given().header("Content-Type", "application/json").body(p2).post("/").then().statusCode(204);
    RestAssured.given().header("Content-Type", "application/json").body(p3).post("/").then().statusCode(204);

    await().atMost(100000, MILLISECONDS).until(() -> received.size() == 3);
    source.close();

    Assertions.assertTrue(received.contains(p1.value));
    Assertions.assertTrue(received.contains(p2.value));
    Assertions.assertTrue(received.contains(p3.value));

    prices = RestAssured.get("/prices/all").as(new TypeRef<List<Price>>() {});
    Assertions.assertEquals(prices.size(), 3);
}
 
源代码29 项目: quarkus-quickstarts   文件: PriceResourceTest.java
@Test
void testPricesEventStream() {
    Client client = ClientBuilder.newClient();
    WebTarget target = client.target(PRICES_SSE_ENDPOINT);

    List<Double> received = new CopyOnWriteArrayList<>();

    SseEventSource source = SseEventSource.target(target).build();
    source.register(inboundSseEvent -> received.add(Double.valueOf(inboundSseEvent.readData())));
    source.open();
    await().atMost(100000, MILLISECONDS).until(() -> received.size() == 3);
    source.close();
}
 
源代码30 项目: quarkus   文件: MultiRxInvokerImpl.java
private SseEventSourceImpl getEventSource() {
    SseEventSourceImpl.SourceBuilder builder = (SseEventSourceImpl.SourceBuilder) SseEventSource
            .target(syncInvoker.getTarget());
    if (executorService != null) {
        builder.executor(executorService);
    }
    SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
    sseEventSource.setAlwaysReconnect(false);
    return sseEventSource;
}
 
 类所在包
 类方法
 同包方法