下面列出了怎么用javax.ws.rs.sse.SseEventSink的API类实例代码及写法,或者点击链接到github查看源代码。
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/stream")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
@Context final Sse sse) {
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return eventSink.send(event);
}
@Override
public void close() {
eventSink.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEvent("bar"));
final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(eventSink);
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return sseBroadcaster.broadcast(event);
}
@Override
public void close() {
sseBroadcaster.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@GET
@Path("/sse")
@SseElementType(MediaType.APPLICATION_JSON)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void serverSentEvents(@Context SseEventSink sink) {
VanillaJavaImmutableData data = new VanillaJavaImmutableData("sse", "ssevalue");
try {
OutboundSseEvent.Builder builder = sse.newEventBuilder();
builder.id(String.valueOf(1))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(data)
.name("stream of json data");
sink.send(builder.build());
} finally {
sink.close();
}
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_PLAIN_TYPE)
.data(Integer.class, i)
.name("stream of numbers");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
.data("<html><body>" + i + "</body></html>")
.name("stream of pages");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@GET
@Path("/stream-xml")
@SseElementType(MediaType.TEXT_XML)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendXmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_XML_TYPE)
.data("<settings><foo bar=\"" + i + "\"/></settings>")
.name("stream of XML data");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@POST
@Path("domains/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void startDomain(@PathParam("id") final String id, @Context SseEventSink eventSink) {
executorService.submit(() -> {
try {
eventSink.send(sse.newEventBuilder().name("domain-progress").data(String.class, "starting domain " + id + " ...").build());
Thread.sleep(200);
eventSink.send(sse.newEvent("domain-progress", "50%"));
Thread.sleep(200);
eventSink.send(sse.newEvent("domain-progress", "60%"));
Thread.sleep(200);
eventSink.send(sse.newEvent("domain-progress", "70%"));
Thread.sleep(200);
eventSink.send(sse.newEvent("domain-progress", "99%"));
Thread.sleep(200);
eventSink.send(sse.newEvent("domain-progress", "Done."));
eventSink.close();
} catch (final InterruptedException e) {
e.printStackTrace();
}
});
}
@POST
@Path("progress/{report_id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@PathParam("report_id") final String id,
@Context SseEventSink es,
@Context Sse sse) {
executorService.execute(() -> {
try {
eventSink.send(sse.newEventBuilder().name("report-progress")
.data(String.class, "Commencing process for report " + id + " ...").build());
es.send(sse.newEvent("Progress", "25%"));
Thread.sleep(500);
es.send(sse.newEvent("Progress", "50%"));
Thread.sleep(500);
es.send(sse.newEvent("Progress", "75%"));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public void add(String playerId, SseEventSink sink, Sse sse) {
QueuedClient client = new QueuedClient(playerId, sink, sse);
// If this client was already in the queue, remove them and add them at the end
if (waitingPlayers.removeFirstOccurrence(client)) {
party.log("Removed client " + playerId + " from queue before adding at end");
GameMetrics.counterDec(GameMetrics.currentQueuedPlayersCounter);
}
party.log("Adding client " + playerId + " into the queue in position " + client.queuePosition());
waitingPlayers.add(client);
GameMetrics.counterInc(GameMetrics.currentQueuedPlayersCounter);
if (party.getCurrentRound().isOpen())
promoteClients();
else
client.notifyPosition();
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Get all events.")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK"),
@ApiResponse(code = 400, message = "Topic is empty or contains invalid characters") })
public void listen(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response,
@QueryParam("topics") @ApiParam(value = "topics") String eventFilter) {
if (!SseUtil.isValidTopicFilter(eventFilter)) {
response.setStatus(Status.BAD_REQUEST.getStatusCode());
return;
}
topicBroadcaster.add(sseEventSink, new SseSinkTopicInfo(eventFilter));
addCommonResponseHeaders(response);
}
/**
* Subscribes the connecting client for state updates. It will initially only send a "ready" event with an unique
* connectionId that the client can use to dynamically alter the list of tracked items.
*
* @return {@link EventOutput} object associated with the incoming connection.
*/
@GET
@Path("/states")
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Initiates a new item state tracker connection")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK") })
public void getStateEvents(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response) {
final SseSinkItemInfo sinkItemInfo = new SseSinkItemInfo();
itemStatesBroadcaster.add(sseEventSink, sinkItemInfo);
addCommonResponseHeaders(response);
String connectionId = sinkItemInfo.getConnectionId();
OutboundSseEvent readyEvent = sse.newEventBuilder().id("0").name("ready").data(connectionId).build();
itemStatesBroadcaster.sendIf(readyEvent, hasConnectionId(connectionId));
}
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
final Builder builder = sse.newEventBuilder();
CompletableFuture
.runAsync(() -> {
sink.send(createEvent(builder.name("book"), 1));
sink.send(createEvent(builder.name("book"), 2));
sink.send(createEvent(builder.name("book"), 3));
sink.send(createEvent(builder.name("book"), 4));
sink.send(createEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@GET
@Path("/titles/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookTitlesOnly(@Context SseEventSink sink) {
final Builder builder = sse.newEventBuilder();
CompletableFuture
.runAsync(() -> {
sink.send(createRawEvent(builder.name("book"), 1));
sink.send(createRawEvent(builder.name("book"), 2));
sink.send(createRawEvent(builder.name("book"), 3));
sink.send(createRawEvent(builder.name("book"), 4));
sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
final Builder builder = sse.newEventBuilder();
CompletableFuture
.runAsync(() -> {
sink.send(createEvent(builder.name("book"), 1));
sink.send(createEvent(builder.name("book"), 2));
sink.send(createEvent(builder.name("book"), 3));
sink.send(createEvent(builder.name("book"), 4));
sink.send(createEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@GET
@Path("/titles/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookTitlesOnly(@Context SseEventSink sink) {
final Builder builder = sse.newEventBuilder();
CompletableFuture
.runAsync(() -> {
sink.send(createRawEvent(builder.name("book"), 1));
sink.send(createRawEvent(builder.name("book"), 2));
sink.send(createRawEvent(builder.name("book"), 3));
sink.send(createRawEvent(builder.name("book"), 4));
sink.send(createRawEvent(builder.name("book"), 5));
})
.whenComplete((r, ex) -> sink.close());
}
@Override
public SseEventSink createContext(Message message) {
final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
if (request == null) {
throw new IllegalStateException("Unable to retrieve HTTP request from the context");
}
final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
ServerProviderFactory.getInstance(message), message.getExchange());
final AsyncResponse async = new AsyncResponseImpl(message);
final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
message.put(SseEventSink.class, sink);
return sink;
}
@Test
public void testCreateSseEventSinkWithDefaultBufferSize() {
final SseEventSink sink = provider.createContext(message);
IntStream
.range(0, 10000)
.mapToObj(i -> sink.send(EVENT))
.map(CompletionStage::toCompletableFuture)
// At this point, buffer is full, but nothing has been delivered so far
.forEach(f -> assertThat(f.isDone(), equalTo(false)));
// The buffer overflow should trigger message rejection and exceptional completion
final CompletableFuture<?> overflow = sink.send(EVENT).toCompletableFuture();
assertThat(overflow.isCompletedExceptionally(), equalTo(true));
exception.expect(CompletionException.class);
exception.expectMessage("The buffer is full (10000), unable to queue SSE event for send.");
overflow.join();
}
@Test
public void testCreateSseEventSinkWithCustomBufferSize() {
message.put(SseEventSinkImpl.BUFFER_SIZE_PROPERTY, 20000);
final SseEventSink sink = provider.createContext(message);
IntStream
.range(0, 20000)
.mapToObj(i -> sink.send(EVENT))
.map(CompletionStage::toCompletableFuture)
// At this point, buffer is full, but nothing has been delivered so far
.forEach(f -> assertThat(f.isDone(), equalTo(false)));
// The buffer overflow should trigger message rejection and exceptional completion
final CompletableFuture<?> overflow = sink.send(EVENT).toCompletableFuture();
assertThat(overflow.isCompletedExceptionally(), equalTo(true));
exception.expect(CompletionException.class);
exception.expectMessage("The buffer is full (20000), unable to queue SSE event for send.");
overflow.join();
}
@GET
@Path("server-sent-events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// ... code that waits 0.1 second
Thread.sleep(100L);
final OutboundSseEvent event = sse.newEventBuilder()
.id(String.valueOf(i))
.name("message-to-rest-sb")
.data(String.class, "Test message " + i + "!")
.build();
eventSink.send(event);
System.out.println("Message " + i + " sent");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
@Context final Sse sse,
@Context final ConnectionContext ctx) {
sendSseUntilFailure(eventSink, sse, ctx.executionContext().executor());
}
private void sendSseUntilFailure(final SseEventSink eventSink, final Sse sse, final Executor executor) {
try {
eventSink.send(sse.newEvent("foo"));
executor.schedule(() -> sendSseUntilFailure(eventSink, sse, executor), 10, MILLISECONDS);
} catch (final Throwable t) {
if (eventSink.isClosed()) {
sseSinkClosedLatch.countDown();
} else {
throw new IllegalStateException("SseEventSink should be closed", t);
}
}
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/unsupported")
@GET
public void getSseUnsupportedType(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEventBuilder()
.data(Buffer.class, ctx.executionContext().bufferAllocator().fromAscii("foo"))
.mediaType(MediaType.TEXT_PLAIN_TYPE)
.build());
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
this.sse = sse;
if(this.broadcaster == null) {
this.broadcaster = sse.newBroadcaster();
}
this.broadcaster.register(eventSink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
this.sse = sse;
if(this.broadcaster == null) {
this.broadcaster = sse.newBroadcaster();
}
this.broadcaster.register(eventSink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build());
}
@Path("register/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
public void register(@PathParam("id") Long id,
@Context SseEventSink sseEventSink) {
final UserEvent event = POOL.get(id);
if (event != null) {
event.getSseBroadcaster().register(sseEventSink);
} else {
throw new NotFoundException();
}
}
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink eventSink) {
eventSink.send(sse.newEvent("You are subscribed"));
broadcaster.register(eventSink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getMessageQueue(@Context SseEventSink eventSink) {
synchronized (outputLock) {
if (this.eventSink != null) {
throw new IllegalStateException("Server sink already served.");
}
this.eventSink = eventSink;
}
}
@GET
@Lock(READ)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void itemEvents(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER)
@DefaultValue("-1") int lastEventId,
@Context SseEventSink eventSink) {
if (lastEventId >= 0)
replayLastMessages(lastEventId, eventSink);
sseBroadcaster.register(eventSink);
}
private void replayLastMessages(int lastEventId, SseEventSink eventSink) {
try {
for (int i = lastEventId; i < messages.size(); i++) {
eventSink.send(createEvent(messages.get(i), i + 1));
}
} catch (Exception e) {
throw new InternalServerErrorException("Could not replay messages ", e);
}
}