下面列出了怎么用javax.ws.rs.sse.SseEventSource的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void shouldGetStreamOfPrices() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(pricesUrl);
AtomicInteger priceCount = new AtomicInteger();
try (SseEventSource source = SseEventSource.target(target).build()) {
source.register(event -> {
Double value = event.readData(Double.class);
System.out.println("Received price: " + value);
priceCount.incrementAndGet();
});
source.open();
Thread.sleep(15 * 1000L);
} catch (InterruptedException ignored) {
}
int count = priceCount.get();
assertTrue(count > 1, "Expected more than 2 prices read from the source, got " + count);
}
@Test
public void testSSE() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:" + RestAssured.port + "/mutiny/pets");
SseEventSource source = SseEventSource.target(target).build();
List<Pet> pets = new CopyOnWriteArrayList<>();
try (SseEventSource eventSource = source) {
eventSource.register(event -> {
Pet pet = event.readData(Pet.class, MediaType.APPLICATION_JSON_TYPE);
pets.add(pet);
}, ex -> {
throw new IllegalStateException("SSE failure", ex);
});
eventSource.open();
await().until(() -> pets.size() == 5);
}
Assertions.assertEquals("neo", pets.get(0).getName());
Assertions.assertEquals("indy", pets.get(1).getName());
Assertions.assertEquals("plume", pets.get(2).getName());
Assertions.assertEquals("titi", pets.get(3).getName());
Assertions.assertEquals("rex", pets.get(4).getName());
}
@Test
public void testSse() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "sse");
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
CompletableFuture<String> res = new CompletableFuture<>();
eventSource.register(new Consumer<InboundSseEvent>() {
@Override
public void accept(InboundSseEvent inboundSseEvent) {
res.complete(inboundSseEvent.readData());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
res.completeExceptionally(throwable);
}
});
eventSource.open();
Assertions.assertEquals("hello", res.get(5, TimeUnit.SECONDS));
}
}
private static void consume() {
try (final SseEventSource sseSource =
SseEventSource
.target(WEB_TARGET)
.build()) {
sseSource.register(System.out::println);
sseSource.open();
for (int counter=0; counter < 5; counter++) {
System.out.println(" ");
for (int innerCounter=0; innerCounter < 5; innerCounter++) {
WEB_TARGET.request().post(Entity.json("event " + innerCounter));
}
Thread.sleep(1000);
}
CLIENT.close();
System.out.println("\nAll messages consumed");
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
private static void consumeEventsViaSubscription() {
try (final SseEventSource eventSource =
SseEventSource.target(target)
.build()) {
eventSource.register(System.out::println);
eventSource.open();
for (int counter = 0; counter < 5; counter++) {
target.request().post(Entity.text("message " + counter));
}
Thread.sleep(1000); // make sure all the events have time to arrive
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void checkHelloObservableSSE(String prefix, TestContext context) {
CountDownLatch latch = new CountDownLatch(2);
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:9000"+prefix+"/hello-observable-sse");
SseEventSource msgEventSource = SseEventSource.target(target).build();
try (SseEventSource eventSource = msgEventSource){
eventSource.register(event -> {
if(latch.getCount() == 2)
context.assertEquals("one", event.readData(String.class));
else
context.assertEquals("two", event.readData(String.class));
latch.countDown();
}, ex -> {
context.fail(ex);
});
eventSource.open();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public final static void main(String[] args) {
WebTarget target = ClientBuilder.newClient().target("http://localhost:8080/jaxrs-sse/rest/events");
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
// EventSource#register(Consumer<InboundSseEvent>)
// Registered event handler will print the received message.
eventSource.register(System.out::println);
// Subscribe to the event stream.
eventSource.open();
// Consume events for just 500 ms
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void sse() throws MalformedURLException, InterruptedException {
final Client client = ClientBuilder.newBuilder().build();
WebTarget base = client.target(String.format("http://localhost:%d", CONTAINER.getConfiguration().getHttpPort()));
//the /META-INF/services/javax.ws.rs.sse.SseEventSource.Builder file is only needed until this is fixed:
//https://issues.apache.org/jira/browse/CXF-7633
//An exception is not thrown on a 404 response but that is not a Meecrowave issue.
try (final SseEventSource eventSource = SseEventSource.target(base.path("/rs/news/update")).build()) {
CountDownLatch cdl = new CountDownLatch(5);
eventSource.register(sse -> {
JsonObject data = sse.readData(JsonObject.class, MediaType.APPLICATION_JSON_TYPE);
assertNotNull(data);
cdl.countDown();
}, e -> {
e.printStackTrace();
fail(e.getMessage());
});
eventSource.open();
assertTrue(cdl.await(20, TimeUnit.SECONDS));
assertTrue(eventSource.close(5, TimeUnit.SECONDS));
}
client.close();
}
@Test
public void testCloseConnection() throws InterruptedException {
//given
ServerSentEventService ServerSentEventService = Application.getInstance(ServerSentEventService.class);
Config config = Application.getInstance(Config.class);
Client client = ClientBuilder.newClient();
//when
WebTarget webTarget = client.target("http://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/sse");
SseEventSource sseEventSource = SseEventSource.target(webTarget).build();
sseEventSource.register((sseEvent) -> {eventData = sseEvent.readData();}, (e) -> e.printStackTrace());
sseEventSource.open();
ServerSentEventService.close("/sse");
sseEventSource.close();
client.close();
//then
assertThat(ServerSentEventService.getConnections("/sse"), not(nullValue()));
assertThat(ServerSentEventService.getConnections("/sse").size(), equalTo(0));
}
@Test
public void testSendData() throws InterruptedException {
//given
Config config = Application.getInstance(Config.class);
Client client = ClientBuilder.newClient();
String data = UUID.randomUUID().toString();
//when
WebTarget webTarget = client.target("http://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/sse");
SseEventSource sseEventSource = SseEventSource.target(webTarget).build();
sseEventSource.register((sseEvent) -> {eventData = sseEvent.readData();}, (e) -> e.printStackTrace());
sseEventSource.open();
//then
Application.getInstance(ServerSentEventService.class).send("/sse", data);
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, equalTo(data)));
sseEventSource.close();
client.close();
}
@Test
public void testSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget();
final Collection<Book> books = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 5);
}
assertThat(books,
hasItems(
new Book("New Book #1", "Author #1"),
new Book("New Book #2", "Author #2"),
new Book("New Book #3", "Author #3"),
new Book("New Book #4", "Author #4"),
new Book("New Book #5", "Author #5")
)
);
}
@Test
public void testBooksStreamIsReturnedFromLastEventId() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/" + UUID.randomUUID())
.property(HttpHeaders.LAST_EVENT_ID_HEADER, 150);
final Collection<Book> books = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 4);
}
// Easing the test verification here, it does not work well for Atm + Jetty
assertThat(books,
hasItems(
new Book("New Book #151", 151),
new Book("New Book #152", 152),
new Book("New Book #153", 153),
new Book("New Book #154", 154)
)
);
}
@Test
public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
final Collection<Book> books = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 4);
}
// Easing the test verification here, it does not work well for Atm + Jetty
assertThat(books,
hasItems(
new Book("New Book #1", 1),
new Book("New Book #2", 2),
new Book("New Book #3", 3),
new Book("New Book #4", 4)
)
);
}
@Test
public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
final Collection<String> titles = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collectRaw(titles), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, titles, 4);
}
// Easing the test verification here, it does not work well for Atm + Jetty
assertThat(titles,
hasItems(
"New Book #1",
"New Book #2",
"New Book #3",
"New Book #4"
)
);
}
@Test
public void testBooksSseContainerResponseFilterIsCalled() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/filtered/sse");
final Collection<Book> books = new ArrayList<>();
assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
.request()
.get(Integer.class), equalTo(0));
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
Thread.sleep(1000);
}
// Easing the test verification here, it does not work well for Atm + Jetty
assertTrue(books.isEmpty());
assertThat(createWebTarget("/rest/api/bookstore/filtered/stats")
.request()
.get(Integer.class), equalTo(1));
}
@Test
public void testBooksStreamIsReturnedFromInboundSseEventsNoDelay() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/nodelay/sse/0");
final Collection<Book> books = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 5);
}
// Easing the test verification here, it does not work well for Atm + Jetty
assertThat(books,
hasItems(
new Book("New Book #1", 1),
new Book("New Book #2", 2),
new Book("New Book #3", 3),
new Book("New Book #4", 4),
new Book("New Book #5", 5)
)
);
}
@Test
public void testNoReconnectAndOneEventReceived() throws InterruptedException, IOException {
try (SseEventSource eventSource = withNoReconnect(Type.EVENT)) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
// Allow the event processor to pull for events (150ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(1));
assertThat(events.get(0).getId(), equalTo("1"));
assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
assertThat(events.get(0).getComment(), equalTo("test comment"));
assertThat(events.get(0).readData(), equalTo("test data"));
}
@Test
public void testNoReconnectAndMixedEventsAreReceived() throws InterruptedException, IOException {
try (SseEventSource eventSource = withNoReconnect(Type.EVENT_MIXED)) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
// Allow the event processor to pull for events (150ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(2));
assertThat(events.get(0).getName(), nullValue());
assertThat(events.get(0).readData(), equalTo("just test data"));
assertThat(events.get(1).getId(), equalTo("1"));
assertThat(events.get(1).getReconnectDelay(), equalTo(10000L));
assertThat(events.get(1).getComment(), equalTo("test comment"));
assertThat(events.get(1).readData(), equalTo("test data"));
}
@Test
public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOException {
try (SseEventSource eventSource = withReconnect(Type.EVENT_NO_RETRY)) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
// Allow the event processor to pull for events (200ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(2));
assertThat(events.get(0).getId(), equalTo("1"));
assertThat(events.get(0).getComment(), equalTo("test comment"));
assertThat(events.get(0).readData(), equalTo("test data"));
assertThat(events.get(1).getId(), equalTo("1"));
assertThat(events.get(1).getComment(), equalTo("test comment"));
assertThat(events.get(1).readData(), equalTo("test data"));
}
@Test
public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, IOException {
try (SseEventSource eventSource = withNoReconnect(Type.EVENT_BAD_RETRY)) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
// Allow the event processor to pull for events (150ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(1));
assertThat(events.get(0).getId(), equalTo("1"));
assertThat(events.get(0).getReconnectDelay(), equalTo(-1L));
assertThat(events.get(0).getComment(), equalTo("test comment"));
assertThat(events.get(0).readData(), equalTo("test data"));
}
@Test
public void testConnectWithLastEventId() throws InterruptedException, IOException {
try (SseEventSource eventSource = withNoReconnect(Type.EVENT_LAST_EVENT_ID, "10")) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(true));
// Allow the event processor to pull for events (150ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(1));
assertThat(events.get(0).getId(), equalTo("10"));
assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
assertThat(events.get(0).getComment(), equalTo("test comment"));
assertThat(events.get(0).readData(), equalTo("test data"));
}
@Test
public void testReconnectWithLastEventId() throws InterruptedException, IOException {
try (SseEventSource eventSource = withReconnect(Type.EVENT_RETRY_LAST_EVENT_ID, "10")) {
eventSource.open();
assertThat(eventSource.isOpen(), equalTo(false));
assertThat(errors.size(), equalTo(1));
// Allow the event processor to pull for events (150ms)
Thread.sleep(150L);
}
assertThat(events.size(), equalTo(1));
assertThat(events.get(0).getId(), equalTo("10"));
assertThat(events.get(0).getReconnectDelay(), equalTo(10000L));
assertThat(events.get(0).getComment(), equalTo("test comment"));
assertThat(events.get(0).readData(), equalTo("test data"));
}
@Test
public void shouldBeAbleToRetrieveRestEndpoint() throws Exception {
WebTarget target = ClientBuilder.newClient().register(JohnzonExtension.class).target(uri+"/sse/{uuid}").resolveTemplate("uuid", UUID.randomUUID().toString());
List<SseModel> receivedModels = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(event -> {
SseModel body = event.readData(SseModel.class, MediaType.APPLICATION_JSON_TYPE);
System.out.println("Received "+body.getName());
receivedModels.add(body);
}, System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
Thread.sleep(1000);
}
assertFalse(receivedModels.isEmpty());
}
@Test
public void testFireSseEventsAsync() throws InterruptedException {
final WebTarget target = createWebTarget("/sse/" + UUID.randomUUID());
final Collection<String> messages = new ArrayList<>();
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(e -> {
System.out.println("New event...");
messages.add(e.readData());
}, System.out::println);
eventSource.open();
// wait for messages to come in
if(messages.size() <= 4) {
Thread.sleep(1000);
}
}
messages.forEach(System.out::println);
assertThat(messages).hasSize(4);
}
public static void main(String... args) throws Exception {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(onEvent, onError, onComplete);
eventSource.open();
//Consuming events for one hour
Thread.sleep(60 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
System.out.println("End");
}
public static void main(String... args) throws Exception {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(subscribeUrl);
try (final SseEventSource eventSource = SseEventSource.target(target)
.reconnectingEvery(5, TimeUnit.SECONDS)
.build()) {
eventSource.register(onEvent, onError, onComplete);
eventSource.open();
System.out.println("Wainting for incoming event ...");
//Consuming events for one hour
Thread.sleep(60 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
System.out.println("End");
}
@Override
public Integer call() throws Exception { // your business logic goes here...
if(target==null) {
target = String.format("http://127.0.0.1:%s%s", port, path);
}
if(url==null) {
url = createChannel();
}
URI uri = new URI(url);
Client client = ClientBuilder.newBuilder()
.build();
//.register(HTTPLoggingFilter.class);
final var events = SseEventSource.target(client.target(uri))
// Reconnect immediately
.reconnectingEvery(0, TimeUnit.MILLISECONDS).build();
events.register(this::onMessage, this::onError);
log.info("Forwarding " + url + " to " + target);
events.open();
if(events.isOpen()) {
log.info("Connected " + url);
}
Thread.currentThread().join();
return 0;
}
@Test
public void test() {
// Check we don't have any prices
List<Price> prices = RestAssured.get("/prices/all").as(new TypeRef<List<Price>>() {});
Assertions.assertTrue(prices.isEmpty());
// Stream the prices
Client client = ClientBuilder.newClient();
WebTarget target = client.target(PRICES_SSE_ENDPOINT);
List<Double> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(Double.valueOf(inboundSseEvent.readData())));
source.open();
// Send the prices
Price p1 = new Price();
p1.value = 1.0;
Price p2 = new Price();
p2.value = 4.0;
Price p3 = new Price();
p3.value = 2.0;
RestAssured.given().header("Content-Type", "application/json").body(p1).post("/").then().statusCode(204);
RestAssured.given().header("Content-Type", "application/json").body(p2).post("/").then().statusCode(204);
RestAssured.given().header("Content-Type", "application/json").body(p3).post("/").then().statusCode(204);
await().atMost(100000, MILLISECONDS).until(() -> received.size() == 3);
source.close();
Assertions.assertTrue(received.contains(p1.value));
Assertions.assertTrue(received.contains(p2.value));
Assertions.assertTrue(received.contains(p3.value));
prices = RestAssured.get("/prices/all").as(new TypeRef<List<Price>>() {});
Assertions.assertEquals(prices.size(), 3);
}
@Test
void testPricesEventStream() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(PRICES_SSE_ENDPOINT);
List<Double> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(Double.valueOf(inboundSseEvent.readData())));
source.open();
await().atMost(100000, MILLISECONDS).until(() -> received.size() == 3);
source.close();
}
private SseEventSourceImpl getEventSource() {
SseEventSourceImpl.SourceBuilder builder = (SseEventSourceImpl.SourceBuilder) SseEventSource
.target(syncInvoker.getTarget());
if (executorService != null) {
builder.executor(executorService);
}
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
sseEventSource.setAlwaysReconnect(false);
return sseEventSource;
}