下面列出了org.springframework.web.servlet.mvc.method.annotation.SseEmitter#onTimeout ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(final @PathVariable String dashboardId) throws IOException {
LOG.info("Creating SseEmitter for dashboard {}", dashboardId);
final SseEmitter sseEmitter = new NotCachedSseEmitter();
sseEmitter.onCompletion(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
sseEmitter.onTimeout(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
handler.addToSessionsMap(sseEmitter, dashboardId);
sseEmitter.send(SseEmitter.event().reconnectTime(0L));
return sseEmitter;
}
/**
* Creates a {@link SseEmitter} and registers the client in the internal database.
* Client will be subscribed to the provided events if specified.
* @param clientId unique client identifier
* @param timeout timeout value in milliseconds
* @param unsubscribe if true unsubscribes from all events that are not provided with
* the next parameter
* @param events events the client wants to subscribe
* @return a new SseEmitter instance
*/
public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe,
boolean completeAfterMessage, String... events) {
SseEmitter emitter = new SseEmitter(timeout);
emitter.onTimeout(emitter::complete);
registerClient(clientId, emitter, completeAfterMessage);
if (events != null && events.length > 0) {
if (unsubscribe) {
unsubscribeFromAllEvents(clientId, events);
}
for (String event : events) {
subscribe(clientId, event);
}
}
return emitter;
}
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
log.info("SSE stream opened for client: " + request.getRemoteAddr());
SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT);
clients.add(emitter);
// Remove SseEmitter from active clients on error or client disconnect
emitter.onTimeout(() -> clients.remove(emitter));
emitter.onCompletion(() -> clients.remove(emitter));
return emitter;
}
@GetMapping("/management/task/sse/{id}")
public SseEmitter realTimeSse(@PathVariable String id) {
SseEmitter emitter = new SseEmitter(TIMEOUT);
RealTimeStatus status = realTimeStatusService.get(id);
Consumer<RealTimeStatus> consumer = statusChange -> {
try {
emitter.send(statusChange);
} catch (IOException e) {
log.error("Error sending event", e);
emitter.complete();
}
};
Runnable completeListener = emitter::complete;
Runnable onComplete = () -> {
status.removeChangeListener(consumer);
status.removeStopListener(completeListener);
};
status.addChangeListener(consumer);
status.onStop(completeListener);
emitter.onCompletion(onComplete);
emitter.onTimeout(onComplete);
consumer.accept(status);
if (status.isInvalid() || status.isFailed() || status.isFinished())
emitter.complete();
return emitter;
}