下面列出了怎么用org.springframework.web.servlet.mvc.method.annotation.SseEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(final @PathVariable String dashboardId) throws IOException {
LOG.info("Creating SseEmitter for dashboard {}", dashboardId);
final SseEmitter sseEmitter = new NotCachedSseEmitter();
sseEmitter.onCompletion(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
sseEmitter.onTimeout(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
handler.addToSessionsMap(sseEmitter, dashboardId);
sseEmitter.send(SseEmitter.event().reconnectTime(0L));
return sseEmitter;
}
@Async
@EventListener
public void handleMessage(Temperature temperature) {
log.info(format("Temperature: %4.2f C, active subscribers: %d",
temperature.getValue(), clients.size()));
List<SseEmitter> deadEmitters = new ArrayList<>();
clients.forEach(emitter -> {
try {
Instant start = Instant.now();
emitter.send(temperature, MediaType.APPLICATION_JSON);
log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
} catch (Exception ignore) {
deadEmitters.add(emitter);
}
});
clients.removeAll(deadEmitters);
}
public synchronized void removeFromSessionsMap(final SseEmitter session, final String dashboardId) {
LOG.debug("Remove SseEmitter {} to sessions map", dashboardId);
if (! StringUtils.isEmpty(dashboardId)) {
final List<SseEmitter> dashboardEmitters = emittersPerDashboard.get(dashboardId);
if (dashboardEmitters != null) {
dashboardEmitters.remove(session);
if (dashboardEmitters.isEmpty()) {
emittersPerDashboard.remove(dashboardId);
}
}
}
}
/**
* Creates a {@link SseEmitter} and registers the client in the internal database.
* Client will be subscribed to the provided events if specified.
* @param clientId unique client identifier
* @param timeout timeout value in milliseconds
* @param unsubscribe if true unsubscribes from all events that are not provided with
* the next parameter
* @param events events the client wants to subscribe
* @return a new SseEmitter instance
*/
public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe,
boolean completeAfterMessage, String... events) {
SseEmitter emitter = new SseEmitter(timeout);
emitter.onTimeout(emitter::complete);
registerClient(clientId, emitter, completeAfterMessage);
if (events != null && events.length > 0) {
if (unsubscribe) {
unsubscribeFromAllEvents(clientId, events);
}
for (String event : events) {
subscribe(clientId, event);
}
}
return emitter;
}
/**
* 获取实时的评测结果.
* @param submissionId - 提交记录的唯一标识符
* @return 包含评测结果信息的StreamingResponseBody对象
* @throws IOException
*/
@RequestMapping("/getRealTimeJudgeResult.action")
public SseEmitter getRealTimeJudgeResultAction(
@RequestParam(value="submissionId") long submissionId,
@RequestParam(value="csrfToken") String csrfToken,
HttpServletRequest request, HttpServletResponse response) throws IOException {
User currentUser = HttpSessionParser.getCurrentUser(request.getSession());
boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession());
Submission submission = submissionService.getSubmission(submissionId);
if ( !isCsrfTokenValid || submission == null ||
!submission.getUser().equals(currentUser) ||
!submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) {
throw new ResourceNotFoundException();
}
response.addHeader("X-Accel-Buffering", "no");
SseEmitter sseEmitter = new SseEmitter();
submissionEventListener.addSseEmitters(submissionId, sseEmitter);
sseEmitter.send("Established");
return sseEmitter;
}
/**
* 提交事件的处理器.
* @param event - 提交记录事件
* @throws IOException
*/
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
long submissionId = event.getSubmissionId();
String judgeResult = event.getJudgeResult();
String message = event.getMessage();
boolean isCompleted = event.isCompleted();
SseEmitter sseEmitter = sseEmitters.get(submissionId);
if ( sseEmitter == null ) {
LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId));
return;
}
Map<String, String> mapMessage = new HashMap<>(3, 1);
mapMessage.put("judgeResult", judgeResult);
mapMessage.put("message", message);
sseEmitter.send(mapMessage);
if ( isCompleted ) {
sseEmitter.complete();
removeSseEmitters(submissionId);
}
}
@GetMapping("/sse")
public SseEmitter streamSseMvc() {
final SseEmitter emitter = new SseEmitter();
final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int eventId = 1; eventId <= 5; ++eventId) {
SseEventBuilder event = SseEmitter.event()
.id(Integer.toString(eventId))
.data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON)
.name("book");
emitter.send(event);
Thread.sleep(100);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
@GetMapping(Constants.API_SSE)
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter();
nonBlockingService.execute(() -> {
try {
emitter.send(Constants.API_SSE_MSG + " @ " + new Date());
emitter.complete();
} catch (Exception ex) {
System.out.println(Constants.GENERIC_EXCEPTION);
emitter.completeWithError(ex);
}
});
return emitter;
}
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now()
.toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
/**
* 发送邀请
* @param userid
* @throws Exception
*/
public static void sendWebIMClients(String sessionid , String userid , String msg) throws Exception{
List<WebIMClient> clients = OnlineUserUtils.webIMClients.getClients(userid) ;
if(clients!=null && clients.size()>0){
for(WebIMClient client : clients){
try{
client.getSse().send(SseEmitter.event().reconnectTime(0).data(msg));
}catch(Exception ex){
OnlineUserUtils.webIMClients.removeClient(sessionid , userid , client.getClient() , true) ;
}finally{
client.getSse().complete();
}
}
}
}
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
RxSeeEmitter emitter = new RxSeeEmitter();
log.info("[{}] Rx SSE stream opened for client: {}",
emitter.getSessionId(), request.getRemoteAddr());
temperatureSensor.temperatureStream()
.subscribe(emitter.getSubscriber());
return emitter;
}
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
log.info("SSE stream opened for client: " + request.getRemoteAddr());
SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT);
clients.add(emitter);
// Remove SseEmitter from active clients on error or client disconnect
emitter.onTimeout(() -> clients.remove(emitter));
emitter.onCompletion(() -> clients.remove(emitter));
return emitter;
}
@RequestMapping("/logs")
public SseEmitter mockLogs() {
SseEmitter emitter = new SseEmitter();
Flowable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> "[" + System.nanoTime() + "] [LogServiceApplication] [Thread " + Thread.currentThread() + "] Some loge here " + i + "\n")
.subscribe(emitter::send, emitter::completeWithError, emitter::complete);
return emitter;
}
@Override
public void sendEventUpdateMessage(final EventType event, final String dashboardId) {
final List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId);
if (emitters != null) {
if (event != EventType.PING) {
sendEventUpdateMessage(EventType.PING, dashboardId);
}
LOG.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event);
for (int i = emitters.size(); i > 0; i--) {
final SseEmitter sseEmitter = emitters.get(i - 1);
try {
final String jsonMessage = objectMapper.writeValueAsString(
ImmutableMap
.<String, String>builder()
.put("type", event.getValue())
.build()
);
sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON);
} catch (IOException e) {
this.removeFromSessionsMap(sseEmitter, dashboardId);
LOG.error("Exception while sending message to emitter for dashboard {}", dashboardId);
}
}
}
}
@GetMapping("/management/task/sse/{id}")
public SseEmitter realTimeSse(@PathVariable String id) {
SseEmitter emitter = new SseEmitter(TIMEOUT);
RealTimeStatus status = realTimeStatusService.get(id);
Consumer<RealTimeStatus> consumer = statusChange -> {
try {
emitter.send(statusChange);
} catch (IOException e) {
log.error("Error sending event", e);
emitter.complete();
}
};
Runnable completeListener = emitter::complete;
Runnable onComplete = () -> {
status.removeChangeListener(consumer);
status.removeStopListener(completeListener);
};
status.addChangeListener(consumer);
status.onStop(completeListener);
emitter.onCompletion(onComplete);
emitter.onTimeout(onComplete);
consumer.accept(status);
if (status.isInvalid() || status.isFailed() || status.isFinished())
emitter.complete();
return emitter;
}
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
String key = event.getKey();
Object message = event.getMessage();
SseEmitter sseEmitter = sseEmitters.get(key);
if ( sseEmitter == null ) {
return;
}
sseEmitter.send(message, MediaType.APPLICATION_JSON);
}
@RequestMapping("/listener/{id}")
public SseEmitter listen(@PathVariable("id") String id){
if(this.listenerType == null)
throw new ListenerTypeNotFound();
final SseEmitter sseEmitter = new SseEmitter();
applicationEventListener.addSseEmitters(this.listenerType , id, sseEmitter);
return sseEmitter;
}
@RequestMapping("/tweetLocation")
public SseEmitter streamTweets() throws InterruptedException{
SseEmitter sseEmitter = new SseEmitter();
emitters.add(sseEmitter);
sseEmitter.onCompletion(() -> emitters.remove(sseEmitter));
streamTweetEventService.streamTweetEvent(emitters);
return sseEmitter;
}
public void registerClient(String clientId, SseEmitter emitter,
boolean completeAfterMessage) {
Client client = this.clients.get(clientId);
if (client == null) {
this.clients.put(clientId,
new Client(clientId, emitter, completeAfterMessage));
}
else {
client.updateEmitter(emitter);
}
}
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
Comment jsonCreate(Comment comment) throws IOException {
Comment newComment = this.commentRepository.save(comment);
synchronized (this.sseEmitters) {
for (SseEmitter sseEmitter : this.sseEmitters) {
// Servlet containers don't always detect ghost connection, so we must catch exceptions ...
try {
sseEmitter.send(newComment, MediaType.APPLICATION_JSON);
} catch (Exception e) {}
}
}
return comment;
}
@RequestMapping("/sse/updates")
SseEmitter subscribeUpdates() {
SseEmitter sseEmitter = new SseEmitter();
synchronized (this.sseEmitters) {
this.sseEmitters.add(sseEmitter);
sseEmitter.onCompletion(() -> {
synchronized (this.sseEmitters) {
this.sseEmitters.remove(sseEmitter);
}
});
}
return sseEmitter;
}
/**
* 移除Server Sent Event的发送者对象.
* @param submissionId - 提交记录的唯一标识符
*/
private void removeSseEmitters(long submissionId) {
sseEmitters.remove(submissionId);
for ( Entry<Long, SseEmitter> mapEntry : sseEmitters.entrySet() ) {
long currentSubmissionId = mapEntry.getKey();
if ( currentSubmissionId < submissionId ) {
sseEmitters.remove(currentSubmissionId);
}
}
}
public WebIMClient(String userid , String client , SseEmitter sse){
this.userid = userid ;
this.sse = sse ;
this.client = client ;
}
public SseEmitter getSse() {
return sse;
}
public void setSse(SseEmitter sse) {
this.sse = sse;
}
public void addSseEmitters(ListenerType listenerType ,String id, SseEmitter sseEmitter) {
sseEmitters.put(listenerType.prepareKey(id), sseEmitter);
}
public void streamTweetEvent(List<SseEmitter> emitters) throws InterruptedException{
List<StreamListener> listeners = new ArrayList<StreamListener>();
StreamListener streamListener = new StreamListener() {
@Override
public void onWarning(StreamWarningEvent warningEvent) {
}
@Override
public void onTweet(Tweet tweet) {
//log.info("User '{}', Tweeted : {}, from ; {}", tweet.getUser().getName() , tweet.getText(), tweet.getUser().getLocation());
Integer connectedUsers = emitters.size();
//log.info("Streaming to :" + connectedUsers +" connected Users");
if (connectedUsers!=0) {
for (SseEmitter emiter : emitters) {
try {
emiter.send(SseEmitter.event().name("streamLocation").data(tweet.getUser().getLocation()));
StringBuilder hashTag = new StringBuilder();
List<HashTagEntity> hashTags = tweet.getEntities().getHashTags();
for (HashTagEntity hash : hashTags) {
hashTag.append("#"+hash.getText() + " ");
}
//System.out.println(hashTag);
emiter.send(SseEmitter.event().name("streamHashtags").data(hashTag));
} catch (IOException e) {
System.out.println("User Disconnected from the Stream");
//e.printStackTrace();
}
}
}else{
//Close Stream when all Users are disconnected.
userStream.close();
log.info("Zero Connected Users - Closing Stream");
}
}
@Override
public void onLimit(int numberOfLimitedTweets) {
}
@Override
public void onDelete(StreamDeleteEvent deleteEvent) {
}
};
//Start Stream when a User is connected
if (emitters.size()==1) {
listeners.add(streamListener);
userStream = twitter.streamingOperations().sample(listeners);
}
// Stream from a specific Location:
// Float west=-122.75f;
// Float south=36.8f;
// Float east=-121.75f;
// Float north = 37.8f;
//
// FilterStreamParameters filterStreamParameters = new FilterStreamParameters();
// filterStreamParameters.addLocation(west, south, east, north);
//Stream userStream = twitter.streamingOperations().filter(filterStreamParameters, listeners);
}
public SseEmitter createSseEmitter(String clientId) {
return createSseEmitter(clientId, 180_000L);
}
public SseEmitter createSseEmitter(String clientId, String... events) {
return createSseEmitter(clientId, 180_000L, false, false, events);
}
public SseEmitter createSseEmitter(String clientId, boolean unsubscribe,
String... events) {
return createSseEmitter(clientId, 180_000L, unsubscribe, false, events);
}