下面列出了怎么用javax.ws.rs.sse.OutboundSseEvent的API类实例代码及写法,或者点击链接到github查看源代码。
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/stream")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
@Context final Sse sse) {
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return eventSink.send(event);
}
@Override
public void close() {
eventSink.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEvent("bar"));
final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(eventSink);
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return sseBroadcaster.broadcast(event);
}
@Override
public void close() {
sseBroadcaster.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@GET
@Path("/sse")
@SseElementType(MediaType.APPLICATION_JSON)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void serverSentEvents(@Context SseEventSink sink) {
VanillaJavaImmutableData data = new VanillaJavaImmutableData("sse", "ssevalue");
try {
OutboundSseEvent.Builder builder = sse.newEventBuilder();
builder.id(String.valueOf(1))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(data)
.name("stream of json data");
sink.send(builder.build());
} finally {
sink.close();
}
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_PLAIN_TYPE)
.data(Integer.class, i)
.name("stream of numbers");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
.data("<html><body>" + i + "</body></html>")
.name("stream of pages");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@GET
@Path("/stream-xml")
@SseElementType(MediaType.TEXT_XML)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendXmlData(@Context SseEventSink sink) {
// send a stream of few events
try {
for (int i = 0; i < 3; i++) {
final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_XML_TYPE)
.data("<settings><foo bar=\"" + i + "\"/></settings>")
.name("stream of XML data");
sink.send(builder.build());
}
} finally {
sink.close();
}
}
@Override
public void doHandleEvent(MCREvent evt) throws MCRException {
if (!evt.getObjectType().equals(MCREvent.OBJECT_TYPE)) {
return;
}
MCRObject obj = (MCRObject) evt.get(MCREvent.OBJECT_KEY);
JsonObject jEvent = new JsonObject();
JsonObject newData = getData(obj);
addUserInfo(jEvent);
jEvent.add("current", newData);
MCRObject oldObj = (MCRObject) evt.get(MCREvent.OBJECT_OLD_KEY);
if (oldObj != null) {
JsonObject oldData = getData(oldObj);
jEvent.add("old", oldData);
}
OutboundSseEvent event = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.id(getId(evt))
.name(getName(evt))
.data(jEvent.toString())
.build();
sseBroadcaster.broadcast(event);
}
@Override
public void doHandleEvent(MCREvent evt) throws MCRException {
if (!evt.getObjectType().equals(MCREvent.DERIVATE_TYPE)) {
return;
}
MCRDerivate der = (MCRDerivate) evt.get(MCREvent.DERIVATE_KEY);
JsonObject jEvent = new JsonObject();
addUserInfo(jEvent);
JsonObject newData = getData(der);
jEvent.add("current", newData);
MCRDerivate oldDer = (MCRDerivate) evt.get(MCREvent.DERIVATE_OLD_KEY);
if (oldDer != null) {
JsonObject oldData = getData(oldDer);
jEvent.add("old", oldData);
}
OutboundSseEvent event = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.id(getId(evt))
.name(getName(evt))
.data(jEvent.toString())
.build();
sseBroadcaster.broadcast(event);
}
/**
* Subscribes the connecting client for state updates. It will initially only send a "ready" event with an unique
* connectionId that the client can use to dynamically alter the list of tracked items.
*
* @return {@link EventOutput} object associated with the incoming connection.
*/
@GET
@Path("/states")
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Initiates a new item state tracker connection")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK") })
public void getStateEvents(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response) {
final SseSinkItemInfo sinkItemInfo = new SseSinkItemInfo();
itemStatesBroadcaster.add(sseEventSink, sinkItemInfo);
addCommonResponseHeaders(response);
String connectionId = sinkItemInfo.getConnectionId();
OutboundSseEvent readyEvent = sse.newEventBuilder().id("0").name("ready").data(connectionId).build();
itemStatesBroadcaster.sendIf(readyEvent, hasConnectionId(connectionId));
}
/**
* Alters the list of tracked items for a given state update connection
*
* @param connectionId the connection Id to change
* @param itemNames the list of items to track
*/
@POST
@Path("/states/{connectionId}")
@ApiOperation(value = "Changes the list of items a SSE connection will receive state updates to.")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK"),
@ApiResponse(code = 404, message = "Unknown connectionId") })
public Object updateTrackedItems(@PathParam("connectionId") String connectionId,
@ApiParam("items") Set<String> itemNames) {
Optional<SseSinkItemInfo> itemStateInfo = itemStatesBroadcaster.getInfoIf(hasConnectionId(connectionId))
.findFirst();
if (!itemStateInfo.isPresent()) {
return Response.status(Status.NOT_FOUND).build();
}
itemStateInfo.get().updateTrackedItems(itemNames);
OutboundSseEvent itemStateEvent = itemStatesEventBuilder.buildEvent(sse.newEventBuilder(), itemNames);
if (itemStateEvent != null) {
itemStatesBroadcaster.sendIf(itemStateEvent, hasConnectionId(connectionId));
}
return Response.ok().build();
}
public @Nullable OutboundSseEvent buildEvent(Builder eventBuilder, Set<String> itemNames) {
Map<String, StateDTO> payload = new HashMap<>(itemNames.size());
for (String itemName : itemNames) {
try {
Item item = itemRegistry.getItem(itemName);
StateDTO stateDto = new StateDTO();
stateDto.state = item.getState().toString();
String displayState = getDisplayState(item, Locale.getDefault());
// Only include the display state if it's different than the raw state
if (stateDto.state != null && !stateDto.state.equals(displayState)) {
stateDto.displayState = displayState;
}
payload.put(itemName, stateDto);
} catch (ItemNotFoundException e) {
logger.warn("Attempting to send a state update of an item which doesn't exist: {}", itemName);
}
}
if (!payload.isEmpty()) {
return eventBuilder.mediaType(MediaType.APPLICATION_JSON_TYPE).data(payload).build();
}
return null;
}
@Override
public CompletionStage<?> send(OutboundSseEvent event) {
final CompletableFuture<?> future = new CompletableFuture<>();
if (!closed.get() && writer != null) {
final Throwable ex = throwable.get();
if (ex != null) {
future.completeExceptionally(ex);
} else if (buffer.offer(new QueuedEvent(event, future))) {
if (dispatching.compareAndSet(false, true)) {
ctx.start(this::dequeue);
}
} else {
future.completeExceptionally(new IllegalStateException("The buffer is full ("
+ bufferSize + "), unable to queue SSE event for send. Please use '"
+ BUFFER_SIZE_PROPERTY + "' property to increase the limit."));
}
} else {
future.completeExceptionally(new IllegalStateException(
"The sink is already closed, unable to queue SSE event for send"));
}
return future;
}
@Override
public SseEventSink createContext(Message message) {
final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
if (request == null) {
throw new IllegalStateException("Unable to retrieve HTTP request from the context");
}
final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
ServerProviderFactory.getInstance(message), message.getExchange());
final AsyncResponse async = new AsyncResponseImpl(message);
final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
message.put(SseEventSink.class, sink);
return sink;
}
/**
* A user should not need to specify a media type when creating an outbound event. The default
* should be <code>MediaType.SERVER_SENT_EVENTS_TYPE</code>.
*/
@Test
public void testDefaultMediaType() {
Sse sse = new SseImpl();
// test newEvent(data)
OutboundSseEvent event = sse.newEvent("myData");
assertNull(event.getName());
assertEquals("myData", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
// test newEvent(name, data)
event = sse.newEvent("myName", "myData2");
assertEquals("myName", event.getName());
assertEquals("myData2", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
// test newEventBuilder()...build()
event = sse.newEventBuilder().comment("myComment").data("myData3").build();
assertEquals("myComment", event.getComment());
assertEquals("myData3", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
}
@GET
@Path("server-sent-events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// ... code that waits 0.1 second
Thread.sleep(100L);
final OutboundSseEvent event = sse.newEventBuilder()
.id(String.valueOf(i))
.name("message-to-rest-sb")
.data(String.class, "Test message " + i + "!")
.build();
eventSink.send(event);
System.out.println("Message " + i + " sent");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
@Lock(WRITE)
public void onEvent(@Observes DomainEvent domainEvent) {
String message = domainEvent.getContents();
messages.add(message);
OutboundSseEvent event = createEvent(message, ++lastEventId);
sseBroadcaster.broadcast(event);
}
public void notifyPosition() {
int position = queuePosition();
OutboundSseEvent event = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(new OutboundMessage.QueuePosition(position))
.build();
sink.send(event);
party.log("Notified queued client " + playerId + " who is currently at position " + position);
}
public void promoteToGame(String roundId) {
OutboundSseEvent event = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(new OutboundMessage.RequeueGame(roundId))
.build();
sink.send(event);
party.log("Promoted queued client " + playerId + " into round " + roundId);
close();
}
public void sendEvent(MCREvent evt, Path path, BasicFileAttributes attrs) {
if (!(path instanceof MCRPath) || !(attrs instanceof MCRFileAttributes)) {
LogManager.getLogger().warn("Cannot handle {} {}", path.getClass(), attrs.getClass());
return;
}
JsonObject file = new JsonObject();
addUserInfo(file);
String derId = ((MCRPath) path).getOwner();
String fPath = ((MCRPath) path).getOwnerRelativePath();
String objId = MCRMetadataManager.getObjectId(MCRObjectID.getInstance(derId), 1, TimeUnit.MINUTES)
.toString();
String relPath = String.format(Locale.ROOT, "objects/%s/derivates/%s/contents/%s", objId, derId, fPath);
String uri = uriResolver.apply(getPathURI(relPath)).toString();
file.addProperty("uri", uri);
file.addProperty("derivate", derId);
file.addProperty("object", objId);
file.addProperty("size", attrs.size());
file.addProperty("modified", attrs.lastModifiedTime().toInstant().toString());
file.addProperty("md5", ((MCRFileAttributes) attrs).md5sum());
file.addProperty("mimeType", context.getMimeType(path.getFileName().toString()));
OutboundSseEvent event = sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.id(getId(evt))
.name(getName(evt))
.data(file.toString())
.build();
sseBroadcaster.broadcast(event);
}
public void sendIf(final OutboundSseEvent event, Predicate<I> predicate) {
logger.trace("broadcast to potential {} sinks", sinks.size());
sinks.forEach((sink, info) -> {
if (sink.isClosed()) {
// We are using a concurrent collection, so we are allowed to modify the collection asynchronous (we
// don't know if there is currently an iteration in progress or not, but it does not matter).
handleRemoval(sink);
return;
}
// Check if we should send at all.
if (!predicate.test(info)) {
return;
}
sink.send(event).exceptionally(th -> {
close(sink);
// We are using a concurrent collection, so we are allowed to modify the collection asynchronous (we
// don't know if there is currently an iteration in progress or not, but it does not matter).
handleRemoval(sink);
final String thClass = th.getClass().toString();
final String message = th.getMessage();
if (thClass.equals("class org.eclipse.jetty.io.EofException")) {
// The peer terminates the connection.
} else if (th instanceof IllegalStateException && message != null
&& (message.equals("The sink is already closed, unable to queue SSE event for send")
|| message.equals("AsyncContext completed and/or Request lifecycle recycled"))) {
// java.lang.IllegalStateException: The sink is already closed, unable to queue SSE event for
// send
// java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled
} else {
logger.warn("failure", th);
}
return null;
});
});
}
/**
* Broadcasts a state event to all currently listening clients, after transforming it to a simple map.
*
* @param stateChangeEvent the {@link ItemStateChangedEvent} containing the new state
*/
public void handleEventBroadcastItemState(final ItemStateChangedEvent stateChangeEvent) {
String itemName = stateChangeEvent.getItemName();
boolean isTracked = itemStatesBroadcaster.getInfoIf(info -> true).anyMatch(tracksItem(itemName));
if (isTracked) {
OutboundSseEvent event = itemStatesEventBuilder.buildEvent(sse.newEventBuilder(), Set.of(itemName));
if (event != null) {
itemStatesBroadcaster.sendIf(event, tracksItem(itemName));
}
}
}
/**
* Creates a new {@link OutboundSseEvent} object containing an {@link EventDTO} created for the given Eclipse
* SmartHome {@link Event}.
*
* @param eventBuilder the builder that should be used
* @param event the event data transfer object
* @return a new OutboundEvent
*/
public static OutboundSseEvent buildEvent(OutboundSseEvent.Builder eventBuilder, EventDTO event) {
final OutboundSseEvent sseEvent = eventBuilder.name("message") //
.mediaType(MediaType.APPLICATION_JSON_TYPE) //
.data(event) //
.build();
return sseEvent;
}
@Override
public void onEvent(SitemapEvent event) {
final Sse sse = this.sse;
if (sse == null) {
logger.trace("broadcast skipped (no one listened since activation)");
return;
}
final OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event")
.mediaType(MediaType.APPLICATION_JSON_TYPE).data(event).build();
broadcaster.sendIf(outboundSseEvent, info -> {
String sitemapName = event.sitemapName;
String pageId = event.pageId;
if (sitemapName != null && sitemapName.equals(subscriptions.getSitemapName(info.subscriptionId))
&& pageId != null && pageId.equals(subscriptions.getPageId(info.subscriptionId))) {
if (logger.isDebugEnabled()) {
if (event instanceof SitemapWidgetEvent) {
logger.debug("Sent sitemap event for widget {} to subscription {}.",
((SitemapWidgetEvent) event).widgetId, info.subscriptionId);
} else if (event instanceof ServerAliveEvent) {
logger.debug("Sent alive event to subscription {}.", info.subscriptionId);
}
}
return true;
}
return false;
});
}
@Context
public void setSse(Sse sse) {
this.broadcaster = sse.newBroadcaster();
this.builder = sse.newEventBuilder();
Flowable
.interval(500, TimeUnit.MILLISECONDS)
.zipWith(
Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
(id, bldr) -> createStatsEvent(bldr, id)
)
.subscribeOn(Schedulers.single())
.subscribe(broadcaster::broadcast);
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {
return builder
.id("" + eventId)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id("" + eventId)
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
.build();
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id("" + eventId)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id("" + eventId)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id("" + eventId)
.data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}
protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) {
return builder
.id(Integer.toString(eventId))
.data(Book.class, new Book("New Book #" + eventId, eventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
}