下面列出了javax.ws.rs.core.MediaType#SERVER_SENT_EVENTS 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
.data("<html><body>" + i + "</body></html>")
.name("stream of pages");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
/**
* Subscribes the connecting client for state updates. It will initially only send a "ready" event with an unique
* connectionId that the client can use to dynamically alter the list of tracked items.
*
* @return {@link EventOutput} object associated with the incoming connection.
*/
@GET
@Path("/states")
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Initiates a new item state tracker connection")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK") })
public void getStateEvents(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response) {
final SseSinkItemInfo sinkItemInfo = new SseSinkItemInfo();
itemStatesBroadcaster.add(sseEventSink, sinkItemInfo);
addCommonResponseHeaders(response);
String connectionId = sinkItemInfo.getConnectionId();
OutboundSseEvent readyEvent = sse.newEventBuilder().id("0").name("ready").data(connectionId).build();
itemStatesBroadcaster.sendIf(readyEvent, hasConnectionId(connectionId));
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<String> getQueue() {
return Multi.createBy().merging()
.streams(
queue.map(b -> json.toJson(b)),
getPingStream()
);
}
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink eventSink) {
eventSink.send(sse.newEvent("You are subscribed"));
broadcaster.register(eventSink);
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Publisher<Book> streamBooks(@QueryParam("sort") String sort) {
if (sort != null) {
return reactiveBookRepository.streamAll(Sort.ascending(sort));
}
return reactiveBookRepository.streamAll();
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build());
}
@GET
@Path("sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void stats(@Context SseEventSink sink) {
new Thread() {
public void run() {
try {
final Builder builder = sse.newEventBuilder();
sink.send(createStatsEvent(builder.name("stats"), 1));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 2));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 3));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 4));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 5));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 6));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 7));
Thread.sleep(500);
sink.send(createStatsEvent(builder.name("stats"), 8));
sink.close();
} catch (final Exception e) {
e.printStackTrace();
}
}
}.start();
}
@GET
@Path("/update")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void newsUpdate(@Context SseEventSink eventSink, @Context Sse sse) {
CompletableFuture.runAsync(() -> {
IntStream.range(1, 6).forEach(c -> {
JsonObject newsEvent = Json.createObjectBuilder().add("news", String.format("Updated Event %d", newsCounter.incrementAndGet())).build();
eventSink.send(sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(newsEvent).build());
});
//closing only on the client is generating a chunked connection exception that can be troubleshooted at a later date.
eventSink.close();
});
}
@GET
@Lock(READ)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void itemEvents(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER)
@DefaultValue("-1") int lastEventId,
@Context SseEventSink eventSink) {
if (lastEventId >= 0)
replayLastMessages(lastEventId, eventSink);
sseBroadcaster.register(eventSink);
}
@Path("register/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
public void register(@PathParam("id") Long id,
@Context SseEventSink sseEventSink) {
final UserEvent event = POOL.get(id);
if (event != null) {
event.getSseBroadcaster().register(sseEventSink);
} else {
throw new NotFoundException();
}
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
this.sse = sse;
if(this.broadcaster == null) {
this.broadcaster = sse.newBroadcaster();
}
this.broadcaster.register(eventSink);
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Publisher<ReactiveBookEntity> streamBooks(@QueryParam("sort") String sort) {
if (sort != null) {
return ReactiveBookEntity.streamAll(Sort.ascending(sort));
}
return ReactiveBookEntity.streamAll();
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(@PathParam int count, @PathParam String name) {
return service.greetings(count, name);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<String> get() {
return Multi.createFrom().resource(
driver::rxSession,
session -> session.readTransaction(tx -> {
RxResult result = tx.run("MATCH (f:Fruit) RETURN f.name as name ORDER BY f.name");
return Multi.createFrom().publisher(result.records())
.map(record -> record.get("name").asString());
})
).withFinalizer(session -> {
return Uni.createFrom().publisher(session.close());
});
}
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
@Path("/subscribe")
public void subscribe(@Context SseEventSink sink) {
sink.send(_sse.newEvent("welcome"));
_sseBroadcaster.register(sink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Response event() {
return Response.status(Status.UNAUTHORIZED).build();
}
@GET
@Path("/objects")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void registerObjectEvents(@Context SseEventSink sseEventSink) {
objectBroadcaster.register(sseEventSink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getMessageQueue(@Context SseEventSink sink) {
SseResource.SINK = sink;
}
@GET
@Path("/derivates")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void registerDerivateEvents(@Context SseEventSink sseEventSink) {
derivateBroadcaster.register(sseEventSink);
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<String> stream() {
return names;
}