类org.springframework.web.servlet.mvc.method.annotation.SseEmitter源码实例Demo

下面列出了怎么用org.springframework.web.servlet.mvc.method.annotation.SseEmitter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mirrorgate   文件: ServerSentEventsController.java
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(final @PathVariable String dashboardId) throws IOException {

    LOG.info("Creating SseEmitter for dashboard {}", dashboardId);

    final SseEmitter sseEmitter = new NotCachedSseEmitter();

    sseEmitter.onCompletion(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    sseEmitter.onTimeout(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    handler.addToSessionsMap(sseEmitter, dashboardId);

    sseEmitter.send(SseEmitter.event().reconnectTime(0L));

    return sseEmitter;
}
 
@Async
@EventListener
public void handleMessage(Temperature temperature) {
   log.info(format("Temperature: %4.2f C, active subscribers: %d",
      temperature.getValue(), clients.size()));

   List<SseEmitter> deadEmitters = new ArrayList<>();
   clients.forEach(emitter -> {
      try {
         Instant start = Instant.now();
         emitter.send(temperature, MediaType.APPLICATION_JSON);
         log.info("Sent to client, took: {}", Duration.between(start, Instant.now()));
      } catch (Exception ignore) {
         deadEmitters.add(emitter);
      }
   });
   clients.removeAll(deadEmitters);
}
 
源代码3 项目: mirrorgate   文件: ServerSentEventsHandler.java
public synchronized void removeFromSessionsMap(final SseEmitter session, final String dashboardId) {

        LOG.debug("Remove SseEmitter {} to sessions map", dashboardId);

        if (! StringUtils.isEmpty(dashboardId)) {
            final List<SseEmitter> dashboardEmitters = emittersPerDashboard.get(dashboardId);

            if (dashboardEmitters != null) {
                dashboardEmitters.remove(session);

                if (dashboardEmitters.isEmpty()) {
                    emittersPerDashboard.remove(dashboardId);
                }
            }
        }
    }
 
源代码4 项目: sse-eventbus   文件: SseEventBus.java
/**
 * Creates a {@link SseEmitter} and registers the client in the internal database.
 * Client will be subscribed to the provided events if specified.
 * @param clientId unique client identifier
 * @param timeout timeout value in milliseconds
 * @param unsubscribe if true unsubscribes from all events that are not provided with
 * the next parameter
 * @param events events the client wants to subscribe
 * @return a new SseEmitter instance
 */
public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe,
		boolean completeAfterMessage, String... events) {
	SseEmitter emitter = new SseEmitter(timeout);
	emitter.onTimeout(emitter::complete);
	registerClient(clientId, emitter, completeAfterMessage);

	if (events != null && events.length > 0) {
		if (unsubscribe) {
			unsubscribeFromAllEvents(clientId, events);
		}
		for (String event : events) {
			subscribe(clientId, event);
		}
	}

	return emitter;
}
 
源代码5 项目: voj   文件: SubmissionController.java
/**
 * 获取实时的评测结果.
 * @param submissionId - 提交记录的唯一标识符
 * @return 包含评测结果信息的StreamingResponseBody对象
 * @throws IOException 
 */
@RequestMapping("/getRealTimeJudgeResult.action")
public SseEmitter getRealTimeJudgeResultAction(
		@RequestParam(value="submissionId") long submissionId,
		@RequestParam(value="csrfToken") String csrfToken,
		HttpServletRequest request, HttpServletResponse response) throws IOException {
	User currentUser = HttpSessionParser.getCurrentUser(request.getSession());
	boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession());
	Submission submission = submissionService.getSubmission(submissionId);
	
	if ( !isCsrfTokenValid || submission == null || 
			!submission.getUser().equals(currentUser) ||
			!submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) {
		throw new ResourceNotFoundException();
	}
	
	response.addHeader("X-Accel-Buffering", "no");
	SseEmitter sseEmitter = new SseEmitter();
	submissionEventListener.addSseEmitters(submissionId, sseEmitter);
	sseEmitter.send("Established");
	return sseEmitter;
}
 
源代码6 项目: voj   文件: ApplicationEventListener.java
/**
 * 提交事件的处理器.
 * @param event - 提交记录事件
 * @throws IOException 
 */
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
	long submissionId = event.getSubmissionId();
	String judgeResult = event.getJudgeResult();
	String message = event.getMessage();
	boolean isCompleted = event.isCompleted();
	SseEmitter sseEmitter = sseEmitters.get(submissionId);
	
	if ( sseEmitter == null ) {
		LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId));
		return;
	}
	Map<String, String> mapMessage = new HashMap<>(3, 1);
	mapMessage.put("judgeResult", judgeResult);
	mapMessage.put("message", message);
	sseEmitter.send(mapMessage);
	
	if ( isCompleted ) {
		sseEmitter.complete();
		removeSseEmitters(submissionId);
	}
}
 
源代码7 项目: cxf   文件: SpringSseEmitterTest.java
@GetMapping("/sse")
public SseEmitter streamSseMvc() {
    final SseEmitter emitter = new SseEmitter();
    final ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    
    sseMvcExecutor.execute(() -> {
        try {
            for (int eventId = 1; eventId <= 5; ++eventId) {
                SseEventBuilder event = SseEmitter.event()
                    .id(Integer.toString(eventId))
                    .data(new Book("New Book #" + eventId, "Author #" + eventId), MediaType.APPLICATION_JSON)
                    .name("book");
                emitter.send(event);
                Thread.sleep(100);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    
    return emitter;
}
 
源代码8 项目: tutorials   文件: SseEmitterController.java
@GetMapping(Constants.API_SSE)
public SseEmitter handleSse() {
    SseEmitter emitter = new SseEmitter();

    nonBlockingService.execute(() -> {
        try {
            emitter.send(Constants.API_SSE_MSG + " @ " + new Date());
            emitter.complete();
        } catch (Exception ex) {
            System.out.println(Constants.GENERIC_EXCEPTION);
            emitter.completeWithError(ex);
        }
    });

    return emitter;
}
 
源代码9 项目: tutorials   文件: SseEmitterController.java
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();

    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                    .data("SSE MVC - " + LocalTime.now()
                        .toString())
                    .id(String.valueOf(i))
                    .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}
 
源代码10 项目: youkefu   文件: OnlineUserUtils.java
/**
 * 发送邀请
 * @param userid
 * @throws Exception 
 */
public static void sendWebIMClients(String sessionid , String userid , String msg) throws Exception{
	List<WebIMClient> clients = OnlineUserUtils.webIMClients.getClients(userid) ;
	if(clients!=null && clients.size()>0){
		for(WebIMClient client : clients){
			try{
				client.getSse().send(SseEmitter.event().reconnectTime(0).data(msg));
			}catch(Exception ex){
				OnlineUserUtils.webIMClients.removeClient(sessionid , userid , client.getClient() , true) ;
			}finally{
				client.getSse().complete();
			}
		}
	}
}
 
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
   RxSeeEmitter emitter = new RxSeeEmitter();
   log.info("[{}] Rx SSE stream opened for client: {}",
      emitter.getSessionId(), request.getRemoteAddr());

   temperatureSensor.temperatureStream()
      .subscribe(emitter.getSubscriber());

   return emitter;
}
 
@RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) {
   log.info("SSE stream opened for client: " + request.getRemoteAddr());
   SseEmitter emitter = new SseEmitter(SSE_SESSION_TIMEOUT);
   clients.add(emitter);

   // Remove SseEmitter from active clients on error or client disconnect
   emitter.onTimeout(() -> clients.remove(emitter));
   emitter.onCompletion(() -> clients.remove(emitter));

   return emitter;
}
 
@RequestMapping("/logs")
public SseEmitter mockLogs() {
    SseEmitter emitter = new SseEmitter();
    Flowable.interval(300, TimeUnit.MILLISECONDS)
            .map(i -> "[" + System.nanoTime() + "] [LogServiceApplication] [Thread " + Thread.currentThread() + "] Some loge here " + i + "\n")
            .subscribe(emitter::send, emitter::completeWithError, emitter::complete);
    return emitter;
}
 
源代码14 项目: mirrorgate   文件: ServerSentEventsHandler.java
@Override
public void sendEventUpdateMessage(final EventType event, final String dashboardId) {

    final List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId);

    if (emitters != null) {

        if (event != EventType.PING) {
            sendEventUpdateMessage(EventType.PING, dashboardId);
        }

        LOG.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event);

        for (int i = emitters.size(); i > 0; i--) {
            final SseEmitter sseEmitter = emitters.get(i - 1);

            try {
                final String jsonMessage = objectMapper.writeValueAsString(
                    ImmutableMap
                        .<String, String>builder()
                        .put("type", event.getValue())
                        .build()
                );
                sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                this.removeFromSessionsMap(sseEmitter, dashboardId);
                LOG.error("Exception while sending message to emitter for dashboard {}", dashboardId);
            }
        }
    }
}
 
源代码15 项目: zhcet-web   文件: RealTimeStatusController.java
@GetMapping("/management/task/sse/{id}")
public SseEmitter realTimeSse(@PathVariable String id) {
    SseEmitter emitter = new SseEmitter(TIMEOUT);

    RealTimeStatus status = realTimeStatusService.get(id);

    Consumer<RealTimeStatus> consumer = statusChange -> {
        try {
            emitter.send(statusChange);
        } catch (IOException e) {
            log.error("Error sending event", e);
            emitter.complete();
        }
    };

    Runnable completeListener = emitter::complete;

    Runnable onComplete = () -> {
        status.removeChangeListener(consumer);
        status.removeStopListener(completeListener);
    };

    status.addChangeListener(consumer);
    status.onStop(completeListener);

    emitter.onCompletion(onComplete);
    emitter.onTimeout(onComplete);
    consumer.accept(status);

    if (status.isInvalid() || status.isFailed() || status.isFinished())
        emitter.complete();

    return emitter;
}
 
@EventListener
  public void submissionEventHandler(SubmissionEvent event) throws IOException {
      
String key = event.getKey();
      Object message = event.getMessage();
      
      SseEmitter sseEmitter = sseEmitters.get(key);
 
      if ( sseEmitter == null ) {
          return;
      }
      
      sseEmitter.send(message, MediaType.APPLICATION_JSON);
  }
 
@RequestMapping("/listener/{id}")
public SseEmitter  listen(@PathVariable("id") String id){
	
	if(this.listenerType == null)
		throw new ListenerTypeNotFound();
	
	final SseEmitter sseEmitter = new SseEmitter();
	applicationEventListener.addSseEmitters(this.listenerType , id, sseEmitter);
	return sseEmitter;
	
}
 
@RequestMapping("/tweetLocation")
public SseEmitter streamTweets() throws InterruptedException{
	
	SseEmitter sseEmitter = new SseEmitter();
	emitters.add(sseEmitter);
	sseEmitter.onCompletion(() -> emitters.remove(sseEmitter));
	
	streamTweetEventService.streamTweetEvent(emitters);
	
	return sseEmitter;
}
 
源代码19 项目: sse-eventbus   文件: SseEventBus.java
public void registerClient(String clientId, SseEmitter emitter,
		boolean completeAfterMessage) {
	Client client = this.clients.get(clientId);
	if (client == null) {
		this.clients.put(clientId,
				new Client(clientId, emitter, completeAfterMessage));
	}
	else {
		client.updateEmitter(emitter);
	}
}
 
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
Comment jsonCreate(Comment comment) throws IOException {
	Comment newComment = this.commentRepository.save(comment);
	synchronized (this.sseEmitters) {
		for (SseEmitter sseEmitter : this.sseEmitters) {
			// Servlet containers don't always detect ghost connection, so we must catch exceptions ...
			try {
				sseEmitter.send(newComment, MediaType.APPLICATION_JSON);
			} catch (Exception e) {}
		}
	}
	return comment;
}
 
@RequestMapping("/sse/updates")
SseEmitter subscribeUpdates() {
	SseEmitter sseEmitter = new SseEmitter();
	synchronized (this.sseEmitters) {
		this.sseEmitters.add(sseEmitter);
		sseEmitter.onCompletion(() -> {
			synchronized (this.sseEmitters) {
				this.sseEmitters.remove(sseEmitter);
			}
		});
	}
	return sseEmitter;
}
 
源代码22 项目: voj   文件: ApplicationEventListener.java
/**
 * 移除Server Sent Event的发送者对象.
 * @param submissionId - 提交记录的唯一标识符
 */
private void removeSseEmitters(long submissionId) {
	sseEmitters.remove(submissionId);
	
	for ( Entry<Long, SseEmitter> mapEntry : sseEmitters.entrySet() ) {
		long currentSubmissionId = mapEntry.getKey();
		if ( currentSubmissionId < submissionId ) {
			sseEmitters.remove(currentSubmissionId);
		}
	}
}
 
源代码23 项目: youkefu   文件: WebIMClient.java
public WebIMClient(String userid , String client , SseEmitter sse){
	this.userid = userid ;
	this.sse = sse ;
	this.client = client ;
}
 
源代码24 项目: youkefu   文件: WebIMClient.java
public SseEmitter getSse() {
	return sse;
}
 
源代码25 项目: youkefu   文件: WebIMClient.java
public void setSse(SseEmitter sse) {
	this.sse = sse;
}
 
public void addSseEmitters(ListenerType listenerType ,String id, SseEmitter sseEmitter) {
    sseEmitters.put(listenerType.prepareKey(id), sseEmitter);
}
 
public void streamTweetEvent(List<SseEmitter> emitters) throws InterruptedException{

    	List<StreamListener> listeners = new ArrayList<StreamListener>();
    	
    	StreamListener streamListener = new StreamListener() {
			@Override
			public void onWarning(StreamWarningEvent warningEvent) {
			}

			@Override
			public void onTweet(Tweet tweet) {
				//log.info("User '{}', Tweeted : {}, from ; {}", tweet.getUser().getName() , tweet.getText(), tweet.getUser().getLocation());
				Integer connectedUsers =  emitters.size();
				
				//log.info("Streaming to :" + connectedUsers +" connected Users");
				
				if (connectedUsers!=0) {
					for (SseEmitter emiter : emitters) {
						try {
							emiter.send(SseEmitter.event().name("streamLocation").data(tweet.getUser().getLocation()));
							
							StringBuilder hashTag = new StringBuilder();
							
							List<HashTagEntity> hashTags = tweet.getEntities().getHashTags();
							for (HashTagEntity hash : hashTags) {
								hashTag.append("#"+hash.getText() + " ");
							}
							//System.out.println(hashTag);
							emiter.send(SseEmitter.event().name("streamHashtags").data(hashTag));
						} catch (IOException e) {
							System.out.println("User Disconnected from the Stream");
							//e.printStackTrace();
						}
					}
				}else{
					//Close Stream when all Users are disconnected.
					userStream.close();
					log.info("Zero Connected Users - Closing Stream");
				}
				
			}

			@Override
			public void onLimit(int numberOfLimitedTweets) {
			}

			@Override
			public void onDelete(StreamDeleteEvent deleteEvent) {
			}
		};
		//Start Stream when a User is connected
		if (emitters.size()==1) {
			listeners.add(streamListener);
			userStream = twitter.streamingOperations().sample(listeners);
		}
		
//		Stream from a specific Location:
//		Float west=-122.75f;
//		Float south=36.8f;
//		Float east=-121.75f;
//		Float north = 37.8f;
//
//		FilterStreamParameters filterStreamParameters = new FilterStreamParameters();
//		filterStreamParameters.addLocation(west, south, east, north);
		//Stream userStream = twitter.streamingOperations().filter(filterStreamParameters, listeners);
	
	}
 
源代码28 项目: sse-eventbus   文件: SseEventBus.java
public SseEmitter createSseEmitter(String clientId) {
	return createSseEmitter(clientId, 180_000L);
}
 
源代码29 项目: sse-eventbus   文件: SseEventBus.java
public SseEmitter createSseEmitter(String clientId, String... events) {
	return createSseEmitter(clientId, 180_000L, false, false, events);
}
 
源代码30 项目: sse-eventbus   文件: SseEventBus.java
public SseEmitter createSseEmitter(String clientId, boolean unsubscribe,
		String... events) {
	return createSseEmitter(clientId, 180_000L, unsubscribe, false, events);
}
 
 同包方法