下面列出了org.springframework.web.servlet.mvc.method.annotation.SseEmitter#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(final @PathVariable String dashboardId) throws IOException {
LOG.info("Creating SseEmitter for dashboard {}", dashboardId);
final SseEmitter sseEmitter = new NotCachedSseEmitter();
sseEmitter.onCompletion(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
sseEmitter.onTimeout(() -> {
handler.removeFromSessionsMap(sseEmitter, dashboardId);
sseEmitter.complete();
});
handler.addToSessionsMap(sseEmitter, dashboardId);
sseEmitter.send(SseEmitter.event().reconnectTime(0L));
return sseEmitter;
}
/**
* 获取实时的评测结果.
* @param submissionId - 提交记录的唯一标识符
* @return 包含评测结果信息的StreamingResponseBody对象
* @throws IOException
*/
@RequestMapping("/getRealTimeJudgeResult.action")
public SseEmitter getRealTimeJudgeResultAction(
@RequestParam(value="submissionId") long submissionId,
@RequestParam(value="csrfToken") String csrfToken,
HttpServletRequest request, HttpServletResponse response) throws IOException {
User currentUser = HttpSessionParser.getCurrentUser(request.getSession());
boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession());
Submission submission = submissionService.getSubmission(submissionId);
if ( !isCsrfTokenValid || submission == null ||
!submission.getUser().equals(currentUser) ||
!submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) {
throw new ResourceNotFoundException();
}
response.addHeader("X-Accel-Buffering", "no");
SseEmitter sseEmitter = new SseEmitter();
submissionEventListener.addSseEmitters(submissionId, sseEmitter);
sseEmitter.send("Established");
return sseEmitter;
}
/**
* 提交事件的处理器.
* @param event - 提交记录事件
* @throws IOException
*/
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
long submissionId = event.getSubmissionId();
String judgeResult = event.getJudgeResult();
String message = event.getMessage();
boolean isCompleted = event.isCompleted();
SseEmitter sseEmitter = sseEmitters.get(submissionId);
if ( sseEmitter == null ) {
LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId));
return;
}
Map<String, String> mapMessage = new HashMap<>(3, 1);
mapMessage.put("judgeResult", judgeResult);
mapMessage.put("message", message);
sseEmitter.send(mapMessage);
if ( isCompleted ) {
sseEmitter.complete();
removeSseEmitters(submissionId);
}
}
@Override
public void sendEventUpdateMessage(final EventType event, final String dashboardId) {
final List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId);
if (emitters != null) {
if (event != EventType.PING) {
sendEventUpdateMessage(EventType.PING, dashboardId);
}
LOG.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event);
for (int i = emitters.size(); i > 0; i--) {
final SseEmitter sseEmitter = emitters.get(i - 1);
try {
final String jsonMessage = objectMapper.writeValueAsString(
ImmutableMap
.<String, String>builder()
.put("type", event.getValue())
.build()
);
sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON);
} catch (IOException e) {
this.removeFromSessionsMap(sseEmitter, dashboardId);
LOG.error("Exception while sending message to emitter for dashboard {}", dashboardId);
}
}
}
}
@GetMapping("/management/task/sse/{id}")
public SseEmitter realTimeSse(@PathVariable String id) {
SseEmitter emitter = new SseEmitter(TIMEOUT);
RealTimeStatus status = realTimeStatusService.get(id);
Consumer<RealTimeStatus> consumer = statusChange -> {
try {
emitter.send(statusChange);
} catch (IOException e) {
log.error("Error sending event", e);
emitter.complete();
}
};
Runnable completeListener = emitter::complete;
Runnable onComplete = () -> {
status.removeChangeListener(consumer);
status.removeStopListener(completeListener);
};
status.addChangeListener(consumer);
status.onStop(completeListener);
emitter.onCompletion(onComplete);
emitter.onTimeout(onComplete);
consumer.accept(status);
if (status.isInvalid() || status.isFailed() || status.isFinished())
emitter.complete();
return emitter;
}
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
String key = event.getKey();
Object message = event.getMessage();
SseEmitter sseEmitter = sseEmitters.get(key);
if ( sseEmitter == null ) {
return;
}
sseEmitter.send(message, MediaType.APPLICATION_JSON);
}
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
Comment jsonCreate(Comment comment) throws IOException {
Comment newComment = this.commentRepository.save(comment);
synchronized (this.sseEmitters) {
for (SseEmitter sseEmitter : this.sseEmitters) {
// Servlet containers don't always detect ghost connection, so we must catch exceptions ...
try {
sseEmitter.send(newComment, MediaType.APPLICATION_JSON);
} catch (Exception e) {}
}
}
return comment;
}
public void streamTweetEvent(List<SseEmitter> emitters) throws InterruptedException{
List<StreamListener> listeners = new ArrayList<StreamListener>();
StreamListener streamListener = new StreamListener() {
@Override
public void onWarning(StreamWarningEvent warningEvent) {
}
@Override
public void onTweet(Tweet tweet) {
//log.info("User '{}', Tweeted : {}, from ; {}", tweet.getUser().getName() , tweet.getText(), tweet.getUser().getLocation());
Integer connectedUsers = emitters.size();
//log.info("Streaming to :" + connectedUsers +" connected Users");
if (connectedUsers!=0) {
for (SseEmitter emiter : emitters) {
try {
emiter.send(SseEmitter.event().name("streamLocation").data(tweet.getUser().getLocation()));
StringBuilder hashTag = new StringBuilder();
List<HashTagEntity> hashTags = tweet.getEntities().getHashTags();
for (HashTagEntity hash : hashTags) {
hashTag.append("#"+hash.getText() + " ");
}
//System.out.println(hashTag);
emiter.send(SseEmitter.event().name("streamHashtags").data(hashTag));
} catch (IOException e) {
System.out.println("User Disconnected from the Stream");
//e.printStackTrace();
}
}
}else{
//Close Stream when all Users are disconnected.
userStream.close();
log.info("Zero Connected Users - Closing Stream");
}
}
@Override
public void onLimit(int numberOfLimitedTweets) {
}
@Override
public void onDelete(StreamDeleteEvent deleteEvent) {
}
};
//Start Stream when a User is connected
if (emitters.size()==1) {
listeners.add(streamListener);
userStream = twitter.streamingOperations().sample(listeners);
}
// Stream from a specific Location:
// Float west=-122.75f;
// Float south=36.8f;
// Float east=-121.75f;
// Float north = 37.8f;
//
// FilterStreamParameters filterStreamParameters = new FilterStreamParameters();
// filterStreamParameters.addLocation(west, south, east, north);
//Stream userStream = twitter.streamingOperations().filter(filterStreamParameters, listeners);
}