org.springframework.web.servlet.mvc.method.annotation.SseEmitter#send ( )源码实例Demo

下面列出了org.springframework.web.servlet.mvc.method.annotation.SseEmitter#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: mirrorgate   文件: ServerSentEventsController.java
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(final @PathVariable String dashboardId) throws IOException {

    LOG.info("Creating SseEmitter for dashboard {}", dashboardId);

    final SseEmitter sseEmitter = new NotCachedSseEmitter();

    sseEmitter.onCompletion(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    sseEmitter.onTimeout(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    handler.addToSessionsMap(sseEmitter, dashboardId);

    sseEmitter.send(SseEmitter.event().reconnectTime(0L));

    return sseEmitter;
}
 
源代码2 项目: voj   文件: SubmissionController.java
/**
 * 获取实时的评测结果.
 * @param submissionId - 提交记录的唯一标识符
 * @return 包含评测结果信息的StreamingResponseBody对象
 * @throws IOException 
 */
@RequestMapping("/getRealTimeJudgeResult.action")
public SseEmitter getRealTimeJudgeResultAction(
		@RequestParam(value="submissionId") long submissionId,
		@RequestParam(value="csrfToken") String csrfToken,
		HttpServletRequest request, HttpServletResponse response) throws IOException {
	User currentUser = HttpSessionParser.getCurrentUser(request.getSession());
	boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession());
	Submission submission = submissionService.getSubmission(submissionId);
	
	if ( !isCsrfTokenValid || submission == null || 
			!submission.getUser().equals(currentUser) ||
			!submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) {
		throw new ResourceNotFoundException();
	}
	
	response.addHeader("X-Accel-Buffering", "no");
	SseEmitter sseEmitter = new SseEmitter();
	submissionEventListener.addSseEmitters(submissionId, sseEmitter);
	sseEmitter.send("Established");
	return sseEmitter;
}
 
源代码3 项目: voj   文件: ApplicationEventListener.java
/**
 * 提交事件的处理器.
 * @param event - 提交记录事件
 * @throws IOException 
 */
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
	long submissionId = event.getSubmissionId();
	String judgeResult = event.getJudgeResult();
	String message = event.getMessage();
	boolean isCompleted = event.isCompleted();
	SseEmitter sseEmitter = sseEmitters.get(submissionId);
	
	if ( sseEmitter == null ) {
		LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId));
		return;
	}
	Map<String, String> mapMessage = new HashMap<>(3, 1);
	mapMessage.put("judgeResult", judgeResult);
	mapMessage.put("message", message);
	sseEmitter.send(mapMessage);
	
	if ( isCompleted ) {
		sseEmitter.complete();
		removeSseEmitters(submissionId);
	}
}
 
源代码4 项目: mirrorgate   文件: ServerSentEventsHandler.java
@Override
public void sendEventUpdateMessage(final EventType event, final String dashboardId) {

    final List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId);

    if (emitters != null) {

        if (event != EventType.PING) {
            sendEventUpdateMessage(EventType.PING, dashboardId);
        }

        LOG.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event);

        for (int i = emitters.size(); i > 0; i--) {
            final SseEmitter sseEmitter = emitters.get(i - 1);

            try {
                final String jsonMessage = objectMapper.writeValueAsString(
                    ImmutableMap
                        .<String, String>builder()
                        .put("type", event.getValue())
                        .build()
                );
                sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                this.removeFromSessionsMap(sseEmitter, dashboardId);
                LOG.error("Exception while sending message to emitter for dashboard {}", dashboardId);
            }
        }
    }
}
 
源代码5 项目: zhcet-web   文件: RealTimeStatusController.java
@GetMapping("/management/task/sse/{id}")
public SseEmitter realTimeSse(@PathVariable String id) {
    SseEmitter emitter = new SseEmitter(TIMEOUT);

    RealTimeStatus status = realTimeStatusService.get(id);

    Consumer<RealTimeStatus> consumer = statusChange -> {
        try {
            emitter.send(statusChange);
        } catch (IOException e) {
            log.error("Error sending event", e);
            emitter.complete();
        }
    };

    Runnable completeListener = emitter::complete;

    Runnable onComplete = () -> {
        status.removeChangeListener(consumer);
        status.removeStopListener(completeListener);
    };

    status.addChangeListener(consumer);
    status.onStop(completeListener);

    emitter.onCompletion(onComplete);
    emitter.onTimeout(onComplete);
    consumer.accept(status);

    if (status.isInvalid() || status.isFailed() || status.isFinished())
        emitter.complete();

    return emitter;
}
 
@EventListener
  public void submissionEventHandler(SubmissionEvent event) throws IOException {
      
String key = event.getKey();
      Object message = event.getMessage();
      
      SseEmitter sseEmitter = sseEmitters.get(key);
 
      if ( sseEmitter == null ) {
          return;
      }
      
      sseEmitter.send(message, MediaType.APPLICATION_JSON);
  }
 
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
Comment jsonCreate(Comment comment) throws IOException {
	Comment newComment = this.commentRepository.save(comment);
	synchronized (this.sseEmitters) {
		for (SseEmitter sseEmitter : this.sseEmitters) {
			// Servlet containers don't always detect ghost connection, so we must catch exceptions ...
			try {
				sseEmitter.send(newComment, MediaType.APPLICATION_JSON);
			} catch (Exception e) {}
		}
	}
	return comment;
}
 
public void streamTweetEvent(List<SseEmitter> emitters) throws InterruptedException{

    	List<StreamListener> listeners = new ArrayList<StreamListener>();
    	
    	StreamListener streamListener = new StreamListener() {
			@Override
			public void onWarning(StreamWarningEvent warningEvent) {
			}

			@Override
			public void onTweet(Tweet tweet) {
				//log.info("User '{}', Tweeted : {}, from ; {}", tweet.getUser().getName() , tweet.getText(), tweet.getUser().getLocation());
				Integer connectedUsers =  emitters.size();
				
				//log.info("Streaming to :" + connectedUsers +" connected Users");
				
				if (connectedUsers!=0) {
					for (SseEmitter emiter : emitters) {
						try {
							emiter.send(SseEmitter.event().name("streamLocation").data(tweet.getUser().getLocation()));
							
							StringBuilder hashTag = new StringBuilder();
							
							List<HashTagEntity> hashTags = tweet.getEntities().getHashTags();
							for (HashTagEntity hash : hashTags) {
								hashTag.append("#"+hash.getText() + " ");
							}
							//System.out.println(hashTag);
							emiter.send(SseEmitter.event().name("streamHashtags").data(hashTag));
						} catch (IOException e) {
							System.out.println("User Disconnected from the Stream");
							//e.printStackTrace();
						}
					}
				}else{
					//Close Stream when all Users are disconnected.
					userStream.close();
					log.info("Zero Connected Users - Closing Stream");
				}
				
			}

			@Override
			public void onLimit(int numberOfLimitedTweets) {
			}

			@Override
			public void onDelete(StreamDeleteEvent deleteEvent) {
			}
		};
		//Start Stream when a User is connected
		if (emitters.size()==1) {
			listeners.add(streamListener);
			userStream = twitter.streamingOperations().sample(listeners);
		}
		
//		Stream from a specific Location:
//		Float west=-122.75f;
//		Float south=36.8f;
//		Float east=-121.75f;
//		Float north = 37.8f;
//
//		FilterStreamParameters filterStreamParameters = new FilterStreamParameters();
//		filterStreamParameters.addLocation(west, south, east, north);
		//Stream userStream = twitter.streamingOperations().filter(filterStreamParameters, listeners);
	
	}