下面列出了怎么用javax.ws.rs.sse.Sse的API类实例代码及写法,或者点击链接到github查看源代码。
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/stream")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
@Context final Sse sse) {
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return eventSink.send(event);
}
@Override
public void close() {
eventSink.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEvent("bar"));
final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
sseBroadcaster.register(eventSink);
scheduleSseEventSend(new SseEmitter() {
@Override
public CompletionStage<?> emit(final OutboundSseEvent event) {
return sseBroadcaster.broadcast(event);
}
@Override
public void close() {
sseBroadcaster.close();
}
}, sse, Refs.of(0), ctx.executionContext().executor());
}
@POST
@Path("progress/{report_id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@PathParam("report_id") final String id,
@Context SseEventSink es,
@Context Sse sse) {
executorService.execute(() -> {
try {
eventSink.send(sse.newEventBuilder().name("report-progress")
.data(String.class, "Commencing process for report " + id + " ...").build());
es.send(sse.newEvent("Progress", "25%"));
Thread.sleep(500);
es.send(sse.newEvent("Progress", "50%"));
Thread.sleep(500);
es.send(sse.newEvent("Progress", "75%"));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public void add(String playerId, SseEventSink sink, Sse sse) {
QueuedClient client = new QueuedClient(playerId, sink, sse);
// If this client was already in the queue, remove them and add them at the end
if (waitingPlayers.removeFirstOccurrence(client)) {
party.log("Removed client " + playerId + " from queue before adding at end");
GameMetrics.counterDec(GameMetrics.currentQueuedPlayersCounter);
}
party.log("Adding client " + playerId + " into the queue in position " + client.queuePosition());
waitingPlayers.add(client);
GameMetrics.counterInc(GameMetrics.currentQueuedPlayersCounter);
if (party.getCurrentRound().isOpen())
promoteClients();
else
client.notifyPosition();
}
/**
* A user should not need to specify a media type when creating an outbound event. The default
* should be <code>MediaType.SERVER_SENT_EVENTS_TYPE</code>.
*/
@Test
public void testDefaultMediaType() {
Sse sse = new SseImpl();
// test newEvent(data)
OutboundSseEvent event = sse.newEvent("myData");
assertNull(event.getName());
assertEquals("myData", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
// test newEvent(name, data)
event = sse.newEvent("myName", "myData2");
assertEquals("myName", event.getName());
assertEquals("myData2", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
// test newEventBuilder()...build()
event = sse.newEventBuilder().comment("myComment").data("myData3").build();
assertEquals("myComment", event.getComment());
assertEquals("myData3", event.getData());
assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
}
@GET
@Path("server-sent-events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// ... code that waits 0.1 second
Thread.sleep(100L);
final OutboundSseEvent event = sse.newEventBuilder()
.id(String.valueOf(i))
.name("message-to-rest-sb")
.data(String.class, "Test message " + i + "!")
.build();
eventSink.send(event);
System.out.println("Message " + i + " sent");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
@Context final Sse sse,
@Context final ConnectionContext ctx) {
sendSseUntilFailure(eventSink, sse, ctx.executionContext().executor());
}
private void sendSseUntilFailure(final SseEventSink eventSink, final Sse sse, final Executor executor) {
try {
eventSink.send(sse.newEvent("foo"));
executor.schedule(() -> sendSseUntilFailure(eventSink, sse, executor), 10, MILLISECONDS);
} catch (final Throwable t) {
if (eventSink.isClosed()) {
sseSinkClosedLatch.countDown();
} else {
throw new IllegalStateException("SseEventSink should be closed", t);
}
}
}
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/unsupported")
@GET
public void getSseUnsupportedType(@Context final SseEventSink eventSink,
@Context final Sse sse) {
eventSink.send(sse.newEventBuilder()
.data(Buffer.class, ctx.executionContext().bufferAllocator().fromAscii("foo"))
.mediaType(MediaType.TEXT_PLAIN_TYPE)
.build());
}
private void scheduleSseEventSend(final SseEmitter emmitter, final Sse sse, final Ref<Integer> iRef,
final Executor executor) {
executor.schedule(() -> {
final int i = iRef.get();
emmitter.emit(sse.newEvent("foo" + i)).whenComplete((r, t) -> {
if (t == null && i < 9) {
iRef.set(i + 1);
scheduleSseEventSend(emmitter, sse, iRef, executor);
} else {
emmitter.close();
}
});
}, 10, MILLISECONDS);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
this.sse = sse;
if(this.broadcaster == null) {
this.broadcaster = sse.newBroadcaster();
}
this.broadcaster.register(eventSink);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
this.sse = sse;
if(this.broadcaster == null) {
this.broadcaster = sse.newBroadcaster();
}
this.broadcaster.register(eventSink);
}
@POST
public void addMessage(final String message, @Context Sse sse) throws IOException {
if (SINK != null) {
SINK.send(sse.newEventBuilder()
.name("sse-message")
.id(String.valueOf(System.currentTimeMillis()))
.data(String.class, message)
.comment("")
.build());
}
}
@Path("start")
@POST
public Response start(@Context Sse sse) {
final UserEvent process = new UserEvent(sse);
POOL.put(process.getId(), process);
executor.submit(process);
final URI uri = UriBuilder.fromResource(ServerSentService.class).path("register/{id}").build(process.getId());
return Response.created(uri).build();
}
@GET
@Path("/{partyId}/queue")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void joinQueue(@PathParam("partyId") String partyId,
@QueryParam("playerId") String playerId,
@Context SseEventSink sink, @Context Sse sse) {
Objects.requireNonNull(playerId, "Client attemted to queue for a party without providing playerId");
Party p = getParty(partyId);
if (p != null)
p.enqueueClient(playerId, sink, sse);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context Sse sse, @Context SseEventSink eventSink) {
// Resource method is invoked when a client subscribes to an event stream.
// That implies that sending events will most likely happen from different
// context - thread / event handler / etc, so common implementation of the
// resource method will store the eventSink instance and the application
// logic will retrieve it when an event should be emitted to the client.
// sending events:
eventSink.send(sse.newEvent("event1"));
}
@GET
@Path("/update")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void newsUpdate(@Context SseEventSink eventSink, @Context Sse sse) {
CompletableFuture.runAsync(() -> {
IntStream.range(1, 6).forEach(c -> {
JsonObject newsEvent = Json.createObjectBuilder().add("news", String.format("Updated Event %d", newsCounter.incrementAndGet())).build();
eventSink.send(sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(newsEvent).build());
});
//closing only on the client is generating a chunked connection exception that can be troubleshooted at a later date.
eventSink.close();
});
}
@Override
public void onEvent(SitemapEvent event) {
final Sse sse = this.sse;
if (sse == null) {
logger.trace("broadcast skipped (no one listened since activation)");
return;
}
final OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event")
.mediaType(MediaType.APPLICATION_JSON_TYPE).data(event).build();
broadcaster.sendIf(outboundSseEvent, info -> {
String sitemapName = event.sitemapName;
String pageId = event.pageId;
if (sitemapName != null && sitemapName.equals(subscriptions.getSitemapName(info.subscriptionId))
&& pageId != null && pageId.equals(subscriptions.getPageId(info.subscriptionId))) {
if (logger.isDebugEnabled()) {
if (event instanceof SitemapWidgetEvent) {
logger.debug("Sent sitemap event for widget {} to subscription {}.",
((SitemapWidgetEvent) event).widgetId, info.subscriptionId);
} else if (event instanceof ServerAliveEvent) {
logger.debug("Sent alive event to subscription {}.", info.subscriptionId);
}
}
return true;
}
return false;
});
}
@Context
public void setSse(Sse sse) {
this.broadcaster = sse.newBroadcaster();
this.builder = sse.newEventBuilder();
Flowable
.interval(500, TimeUnit.MILLISECONDS)
.zipWith(
Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
(id, bldr) -> createStatsEvent(bldr, id)
)
.subscribeOn(Schedulers.single())
.subscribe(broadcaster::broadcast);
}
/**
* A user should not need to specify the type of data being sent in an outbound
* event. In that case the OutboundSseEvent should use the data object's type. Other
* types may be specified, but the default (if not specified by the user) should be
* the return value from the object's <code>getClass()</code> method.
*/
@Test
public void testDefaultClass() {
Sse sse = new SseImpl();
// test newEvent(string)
OutboundSseEvent event = sse.newEvent("myData");
assertNull(event.getName());
assertEquals("myData", event.getData());
assertEquals(String.class, event.getType());
// test newEvent(name, data)
event = sse.newEvent("myName", "myData2");
assertEquals("myName", event.getName());
assertEquals("myData2", event.getData());
assertEquals(String.class, event.getType());
// test newEventBuilder()...build()
event = sse.newEventBuilder().comment("myComment").data("myData3").build();
assertEquals("myComment", event.getComment());
assertEquals("myData3", event.getData());
assertEquals(String.class, event.getType());
// test that object's class is re-enabled when calling different signatures of the data method
OutboundSseEvent.Builder builder = sse.newEventBuilder();
builder.data(TestData.class, new TestDataImpl("1", "2"));
event = builder.build();
assertEquals(TestData.class, event.getType());
builder.data("myString");
event = builder.build();
assertEquals(String.class, event.getType());
// same thing, but don't build in between calls to data
event = sse.newEventBuilder().data(TestDataImpl.class, new TestDataImpl("3")).data("anotherString").build();
assertEquals(String.class, event.getType());
assertEquals("anotherString", event.getData());
}
@GET
@Path("/{uuid}")
@Produces(SERVER_SENT_EVENTS)
public void doSseCall(@PathParam("uuid") String uuid, @Context SseEventSink sink, @Context Sse sse) {
final OutboundSseEvent.Builder builder = sse.newEventBuilder();
OutboundSseEvent event = builder.id(uuid)
.data(SseModel.class, new SseModel("some model "+uuid))
.build();
sink.send(event);
sink.close();
}
@GET
@Path("/events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) {
OutboundSseEvent event = sse.newEventBuilder()
.name("echo-headers")
.data(String.class, headers.getHeaderString(AddHeaderOnRequestFilter.FILTER_HEADER_KEY))
.build();
eventSink.send(event);
}
@Context
public void setSse(final Sse sse) {
this.sse = sse;
}
UserEvent(Sse sse) {
this.sse = sse;
this.sseBroadcaster = sse.newBroadcaster();
id = System.currentTimeMillis();
}
public QueuedClient(String playerId, SseEventSink sink, Sse sse) {
this.playerId = playerId;
this.sink = sink;
this.sse = sse;
}
public void enqueueClient(String playerId, SseEventSink sink, Sse sse) {
queue.add(playerId, sink, sse);
}
MCRObjectHandler(SseBroadcaster sseBroadcaster, Sse sse,
Function<URI, URI> uriResolver) {
this.sseBroadcaster = sseBroadcaster;
this.sse = sse;
this.uriResolver = uriResolver;
}
MCRDerivateHandler(SseBroadcaster sseBroadcaster, Sse sse, Function<URI, URI> uriResolver) {
this.sseBroadcaster = sseBroadcaster;
this.sse = sse;
this.uriResolver = uriResolver;
}
MCRPathHandler(SseBroadcaster sseBroadcaster, Sse sse, Function<URI, URI> uriResolver, ServletContext context) {
this.sseBroadcaster = sseBroadcaster;
this.sse = sse;
this.context = context;
this.uriResolver = uriResolver;
}
public SseRequest(Sse sse, SseEventSink eventSink) {
this.sse = sse;
this.eventSink = eventSink;
}