下面列出了javax.ws.rs.sse.SseEventSink#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@GET
@Path("/stream-xml")
@SseElementType(MediaType.TEXT_XML)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendXmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_XML_TYPE)
.data("<settings><foo bar=\"" + i + "\"/></settings>")
.name("stream of XML data");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEvent("bar"));
final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(eventSink);
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return sseBroadcaster.broadcast(event);
}
@Override
public void close() {
sseBroadcaster.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@GET
@Path("/sse")
@SseElementType(MediaType.APPLICATION_JSON)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void serverSentEvents(@Context SseEventSink sink) {
VanillaJavaImmutableData data = new VanillaJavaImmutableData("sse", "ssevalue");
try {
OutboundSseEvent.Builder builder = sse.newEventBuilder();
builder.id(String.valueOf(1))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(data)
.name("stream of json data");
sink.send(builder.build());
} finally {
sink.close();
}
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_PLAIN_TYPE)
.data(Integer.class, i)
.name("stream of numbers");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
.data("<html><body>" + i + "</body></html>")
.name("stream of pages");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
private void sendSseUntilFailure(final SseEventSink eventSink, final Sse sse, final Executor executor) {
try {
eventSink.send(sse.newEvent("foo"));
executor.schedule(() -> sendSseUntilFailure(eventSink, sse, executor), 10, MILLISECONDS);
} catch (final Throwable t) {
if (eventSink.isClosed()) {
sseSinkClosedLatch.countDown();
} else {
throw new IllegalStateException("SseEventSink should be closed", t);
}
}
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/unsupported")
@GET
public void getSseUnsupportedType(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEventBuilder()
.data(Buffer.class, ctx.executionContext().bufferAllocator().fromAscii("foo"))
.mediaType(MediaType.TEXT_PLAIN_TYPE)
.build());
}
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink eventSink) {
eventSink.send(sse.newEvent("You are subscribed"));
broadcaster.register(eventSink);
}
private void replayLastMessages(int lastEventId, SseEventSink eventSink) {
try {
for (int i = lastEventId; i < messages.size(); i++) {
eventSink.send(createEvent(messages.get(i), i + 1));
}
} catch (Exception e) {
throw new InternalServerErrorException("Could not replay messages ", e);
}
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context Sse sse, @Context SseEventSink eventSink) {
// Resource method is invoked when a client subscribes to an event stream.
// That implies that sending events will most likely happen from different
// context - thread / event handler / etc, so common implementation of the
// resource method will store the eventSink instance and the application
// logic will retrieve it when an event should be emitted to the client.
// sending events:
eventSink.send(sse.newEvent("event1"));
}
@GET
//@Path("register")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void register(@Context SseEventSink eventSink) {
eventSink.send(sse.newEvent("welcome!"));
sseBroadcaster.register(eventSink);
}
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
@Path("/subscribe")
public void subscribe(@Context SseEventSink sink) {
sink.send(_sse.newEvent("welcome"));
_sseBroadcaster.register(sink);
}
@GET
@Path("/{uuid}")
@Produces(SERVER_SENT_EVENTS)
public void doSseCall(@PathParam("uuid") String uuid, @Context SseEventSink sink, @Context Sse sse) {
final OutboundSseEvent.Builder builder = sse.newEventBuilder();
OutboundSseEvent event = builder.id(uuid)
.data(SseModel.class, new SseModel("some model "+uuid))
.build();
sink.send(event);
sink.close();
}
@GET
@Path("/events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) {
OutboundSseEvent event = sse.newEventBuilder()
.name("echo-headers")
.data(String.class, headers.getHeaderString(AddHeaderOnRequestFilter.FILTER_HEADER_KEY))
.build();
eventSink.send(event);
}
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
sseEventSink.send(sse.newEvent("Welcome !"));
this.sseBroadcaster.register(sseEventSink);
sseEventSink.send(sse.newEvent("You are registred !"));
}
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink,
@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastReceivedId) {
int lastEventId = 1;
if (lastReceivedId != -1) {
lastEventId = ++lastReceivedId;
}
boolean running = true;
while (running) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseEventSink.send(sseEvent);
lastEventId++;
}
//Simulate connection close
if (lastEventId % 5 == 0) {
sseEventSink.close();
break;
}
try {
//Wait 5 seconds
Thread.sleep(5 * 1000);
} catch (InterruptedException ex) {
// ...
}
//Simulatae a while boucle break
running = lastEventId <= 2000;
}
sseEventSink.close();
}