类org.springframework.web.socket.TextMessage源码实例Demo

下面列出了怎么用org.springframework.web.socket.TextMessage的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void handleTextMessage() throws Exception {
  Message<String> message = new Message<>(0, 1L, 1, 1L, "a");
  TextMessage textMessage = new TextMessage((new ObjectMapper()).writeValueAsString(message));
  String refUtf8 = new String(textMessage.getPayload().getBytes(), UTF_8);
  String refUtf16 = new String(textMessage.getPayload().getBytes(), UTF_16);

  underTest.afterConnectionEstablished(session);
  underTest.handleTextMessage(session, textMessage);

  ArgumentCaptor<String> valueCapture = ArgumentCaptor.forClass(String.class);
  verify(service).onEvent(valueCapture.capture());

  String out = valueCapture.getValue();
  assertEquals(refUtf8, out);
  assertNotEquals(refUtf16, out);
}
 
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    logger.error("Error in session {}: {}", session.getId(), exception.getMessage());
    if (exception.getMessage().contains("Connection reset by peer")) {
        afterConnectionClosed(session, CloseStatus.SESSION_NOT_RELIABLE);
        return;
    }

    JsonMessageBuilder builder;
    session = sessionMonitor.getSession(session.getId());

    if (exception instanceof JsonParseException) {
        builder = JsonMessageBuilder
                .createErrorResponseBuilder(HttpServletResponse.SC_BAD_REQUEST, "Incorrect JSON syntax");
    } else {
        builder = JsonMessageBuilder
                .createErrorResponseBuilder(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Internal server error");
    }
    try {
        session.sendMessage(new TextMessage(GsonFactory.createGson().toJson(builder.build())));
    } catch (ClosedChannelException closedChannelException) {
        logger.error("WebSocket error: Channel is closed");
    }
}
 
@Test
public void connectReceiveAndCloseWithStompFrame() throws Exception {
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
	accessor.setDestination("/destination");
	MessageHeaders headers = accessor.getMessageHeaders();
	Message<byte[]> message = MessageBuilder.createMessage("body".getBytes(StandardCharsets.UTF_8), headers);
	byte[] bytes = new StompEncoder().encode(message);
	TextMessage textMessage = new TextMessage(bytes);
	SockJsFrame frame = SockJsFrame.messageFrame(new Jackson2SockJsMessageCodec(), textMessage.getPayload());

	String body = "o\n" + frame.getContent() + "\n" + "c[3000,\"Go away!\"]";
	ClientHttpResponse response = response(HttpStatus.OK, body);
	connect(response);

	verify(this.webSocketHandler).afterConnectionEstablished(any());
	verify(this.webSocketHandler).handleMessage(any(), eq(textMessage));
	verify(this.webSocketHandler).afterConnectionClosed(any(), eq(new CloseStatus(3000, "Go away!")));
	verifyNoMoreInteractions(this.webSocketHandler);
}
 
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
	if (!(message instanceof TextMessage)) {
		throw new IllegalArgumentException(this + " supports text messages only.");
	}
	if (this.state != State.OPEN) {
		throw new IllegalStateException(this + " is not open: current state " + this.state);
	}

	String payload = ((TextMessage) message).getPayload();
	payload = getMessageCodec().encode(payload);
	payload = payload.substring(1);  // the client-side doesn't need message framing (letter "a")

	TextMessage messageToSend = new TextMessage(payload);
	if (logger.isTraceEnabled()) {
		logger.trace("Sending message " + messageToSend + " in " + this);
	}
	sendInternal(messageToSend);
}
 
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
	checkNativeSessionInitialized();

	if (logger.isTraceEnabled()) {
		logger.trace("Sending " + message + ", " + this);
	}

	if (message instanceof TextMessage) {
		sendTextMessage((TextMessage) message);
	}
	else if (message instanceof BinaryMessage) {
		sendBinaryMessage((BinaryMessage) message);
	}
	else if (message instanceof PingMessage) {
		sendPingMessage((PingMessage) message);
	}
	else if (message instanceof PongMessage) {
		sendPongMessage((PongMessage) message);
	}
	else {
		throw new IllegalStateException("Unexpected WebSocketMessage type: " + message);
	}
}
 
@Test
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
	WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler);
	TestSockJsSession sockJsSession = new TestSockJsSession(
			"1", this.sockJsConfig, wsHandler, Collections.<String, Object>emptyMap());

	String msg1 = "message 1";
	String msg2 = "message 2";
	String msg3 = "message 3";

	willThrow(new IOException()).given(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));

	sockJsSession.delegateConnectionEstablished();
	try {
		sockJsSession.delegateMessages(msg1, msg2, msg3);
		fail("expected exception");
	}
	catch (SockJsMessageDeliveryException ex) {
		assertEquals(Collections.singletonList(msg3), ex.getUndeliveredMessages());
		verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession);
		verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1));
		verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
		verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR);
		verifyNoMoreInteractions(this.webSocketHandler);
	}
}
 
@Override
public void executeSendRequest(URI url, HttpHeaders headers, TextMessage message) {
	if (logger.isTraceEnabled()) {
		logger.trace("Starting XHR send, url=" + url);
	}
	ResponseEntity<String> response = executeSendRequestInternal(url, headers, message);
	if (response.getStatusCode() != HttpStatus.NO_CONTENT) {
		if (logger.isErrorEnabled()) {
			logger.error("XHR send request (url=" + url + ") failed: " + response);
		}
		throw new HttpServerErrorException(response.getStatusCode());
	}
	if (logger.isTraceEnabled()) {
		logger.trace("XHR send request (url=" + url + ") response: " + response);
	}
}
 
@Test  // SPR-11648
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	String destHeader = "destination:/app/number";
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
		assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
	}
	finally {
		session.close();
	}
}
 
@Test // SPR-17140
public void overflowStrategyDrop() throws IOException, InterruptedException {

	BlockingSession session = new BlockingSession();
	session.setId("123");
	session.setOpen(true);

	final ConcurrentWebSocketSessionDecorator decorator =
			new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP);

	sendBlockingMessage(decorator);

	StringBuilder sb = new StringBuilder();
	for (int i = 0 ; i < 1023; i++) {
		sb.append("a");
	}

	for (int i=0; i < 5; i++) {
		TextMessage message = new TextMessage(sb.toString());
		decorator.sendMessage(message);
	}

	assertEquals(1023, decorator.getBufferSize());
	assertTrue(session.isOpen());

}
 
源代码10 项目: computoser   文件: GameHandler.java
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception {
    try {
        GameMessage message = getMessage(textMessage);
        switch(message.getAction()) {
            case INITIALIZE: initialize(message, session); break;
            case JOIN: join(message.getGameId(), message.getPlayerName(), session); break;
            case LEAVE: leave(session.getId()); break;
            case START: startGame(message); break;
            case ANSWER: answer(message, session.getId()); break;
            case JOIN_RANDOM: joinRandomGame(message.getPlayerName(), session); break;
        }
    } catch (Exception ex) {
        logger.error("Exception occurred while handling message", ex);
    }
}
 
@Test
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	TextMessage m1 = create(StompCommand.SUBSCRIBE)
			.headers("id:subs1", "destination:/topic/increment").build();
	TextMessage m2 = create(StompCommand.SEND)
			.headers("destination:/app/increment").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
	}
	finally {
		session.close();
	}
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void handleWebSocketMessageSplitAcrossTwoMessage() throws Exception {
	WebSocketHandler webSocketHandler = connect();

	String part1 = "SEND\na:alpha\n\nMessage";
	webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part1));

	verifyNoMoreInteractions(this.stompSession);

	String part2 = " payload\0";
	webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part2));

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(this.stompSession).handleMessage(captor.capture());
	Message<byte[]> message = captor.getValue();
	assertNotNull(message);

	StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
	assertEquals(StompCommand.SEND, accessor.getCommand());
	assertEquals("alpha", headers.getFirst("a"));
	assertEquals("Message payload", new String(message.getPayload(), UTF_8));
}
 
@Test // SPR-17140
public void overflowStrategyDrop() throws IOException, InterruptedException {

	BlockingSession session = new BlockingSession();
	session.setId("123");
	session.setOpen(true);

	final ConcurrentWebSocketSessionDecorator decorator =
			new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP);

	sendBlockingMessage(decorator);

	StringBuilder sb = new StringBuilder();
	for (int i = 0 ; i < 1023; i++) {
		sb.append("a");
	}

	for (int i=0; i < 5; i++) {
		TextMessage message = new TextMessage(sb.toString());
		decorator.sendMessage(message);
	}

	assertEquals(1023, decorator.getBufferSize());
	assertTrue(session.isOpen());

}
 
源代码14 项目: artemis   文件: InstanceRegistry.java
protected void sendHeartbeat() {
    try {
        if (_sessionContext.get() == null) {
            return;
        }

        long heartbeatPrepareStartTime = System.currentTimeMillis();
        final TextMessage message = _instanceRepository.getHeartbeatMessage();
        _heartbeatPrepareLatency.addValue(System.currentTimeMillis() - heartbeatPrepareStartTime);
        if (message == null) {
            _logger.info("heartbeat message is null");
            _lastHeartbeatTime = System.currentTimeMillis();
            return;
        }

        long heartbeatSendStartTime = System.currentTimeMillis();
        _sessionContext.get().sendMessage(message);
        _heartbeatSendLatency.addValue(System.currentTimeMillis() - heartbeatSendStartTime);
        _lastHeartbeatTime = System.currentTimeMillis();
        _heartbeatAcceptStartTime = System.currentTimeMillis();
    } catch (Throwable e) {
        _logger.warn("send heartbeat failed.", e);
    }
}
 
@Test
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
	TextMessage message1 = create(StompCommand.SUBSCRIBE)
			.headers("id:subs1", "destination:/topic/increment").build();
	TextMessage message2 = create(StompCommand.SEND)
			.headers("destination:/app/increment").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
	}
	finally {
		session.close();
	}
}
 
源代码16 项目: FlyCms   文件: WebSocketService.java
/**
 * 发送消息给指定的用户
 */
public void sendMessageToUser(String userId, TextMessage message) {
	for (WebSocketSession user : users) {
		System.out.println(user.getAttributes().get("accountId")+"========发送消息给指定的用户========="+userId);
		if (user.getAttributes().get("accountId").equals(userId)) {
            try {
                // isOpen()在线就发送
                if (user.isOpen()) {
                    user.sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
 
@Test  // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
	TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));

		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
	}
	finally {
		session.close();
	}
}
 
@Test
public void handleMessageToClientWithSimpDisconnectAck() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
			"\n\u0000", actual.getPayload());
}
 
public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
	String payload = message.getPayload();
	if (StringUtils.isEmpty(payload)) {
		return;
	}
	String[] messages;
	try {
		messages = getSockJsServiceConfig().getMessageCodec().decode(payload);
	}
	catch (Throwable ex) {
		logger.error("Broken data received. Terminating WebSocket connection abruptly", ex);
		tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA);
		return;
	}
	delegateMessages(messages);
}
 
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
	String text = "SEND\na:alpha\n\nMessage payload\0";
	connect().handleMessage(this.webSocketSession, new TextMessage(text));

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(this.stompSession).handleMessage(captor.capture());
	Message<byte[]> message = captor.getValue();
	assertNotNull(message);

	StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
	assertEquals(StompCommand.SEND, accessor.getCommand());
	assertEquals("alpha", headers.getFirst("a"));
	assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
 
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
@Test
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
	TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));

		String payload = clientHandler.actual.get(0).getPayload();
		assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
	}
	finally {
		session.close();
	}
}
 
源代码23 项目: JobX   文件: TerminalHandler.java
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    super.handleTextMessage(session, message);
    try {
        getClient(session, null);
        if (this.terminalClient != null) {
            if (!terminalClient.isClosed()) {
                terminalClient.write(message.getPayload());
            } else {
                session.close();
            }
        }
    } catch (Exception e) {
        session.sendMessage(new TextMessage("Sorry! jobx Terminal was closed, please try again. "));
        terminalClient.disconnect();
        session.close();
    }
}
 
源代码24 项目: full-teaching   文件: WebSocketChatUser.java
private void send(ObjectNode msg) {
	try {
		session.sendMessage(new TextMessage(msg.toString()));
	} catch (IOException e) {
		e.printStackTrace();
	}
}
 
@Test
public void handleMessageBadData() throws Exception {
	TextMessage message = new TextMessage("[\"x]");
	this.session.handleMessage(message, this.webSocketSession);

	this.session.isClosed();
	verify(this.webSocketHandler).handleTransportError(same(this.session), any(IOException.class));
	verifyNoMoreInteractions(this.webSocketHandler);
}
 
源代码26 项目: WeEvent   文件: BrokerStomp.java
private void handleOnEvent(String headerIdStr,
                           String subscriptionId,
                           WeEvent event,
                           WebSocketSession session) {
    StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
    // package the return frame
    accessor.setSubscriptionId(headerIdStr);
    accessor.setNativeHeader("subscription-id", subscriptionId);
    accessor.setMessageId(headerIdStr);
    accessor.setDestination(event.getTopic());
    accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8));

    // set custom properties in header
    for (Map.Entry<String, String> custom : event.getExtensions().entrySet()) {
        accessor.setNativeHeader(custom.getKey(), custom.getValue());
    }

    // set eventId in header
    accessor.setNativeHeader(WeEventConstants.EXTENSIONS_EVENT_ID, event.getEventId());

    // payload == content
    MessageHeaders headers = accessor.getMessageHeaders();
    Message<byte[]> message = MessageBuilder.createMessage(event.getContent(), headers);
    byte[] bytes = new StompEncoder().encode(message);

    // send to remote
    send2Remote(session, new TextMessage(bytes));
}
 
源代码27 项目: openvidu   文件: InfoHandler.java
public void sendInfo(String info){
	for (WebSocketSession session : this.sessions.values()) {
		try {
			this.semaphore.acquire();
			session.sendMessage(new TextMessage(info));
			this.semaphore.release();
		} catch (IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
}
 
@Test
public void handleFrameMessageWithWebSocketHandlerException() throws Exception {
	this.session.handleFrame(SockJsFrame.openFrame().getContent());
	willThrow(new IllegalStateException("Fake error")).given(this.handler)
			.handleMessage(this.session, new TextMessage("foo"));
	willThrow(new IllegalStateException("Fake error")).given(this.handler)
			.handleMessage(this.session, new TextMessage("bar"));
	this.session.handleFrame(SockJsFrame.messageFrame(CODEC, "foo", "bar").getContent());
	assertThat(this.session.isOpen(), equalTo(true));
	verify(this.handler).afterConnectionEstablished(this.session);
	verify(this.handler).handleMessage(this.session, new TextMessage("foo"));
	verify(this.handler).handleMessage(this.session, new TextMessage("bar"));
	verifyNoMoreInteractions(this.handler);
}
 
源代码29 项目: api-layer   文件: WebSocketServerHandlerTest.java
@Test
public void handleByeMessage() throws Exception {
    WebSocketServerHandler handler = new WebSocketServerHandler();
    WebSocketSession session = mock(WebSocketSession.class);

    handler.handleMessage(session, new TextMessage("BYE"));

    ArgumentCaptor<WebSocketMessage<?>> messageCaptor = ArgumentCaptor.forClass(WebSocketMessage.class);
    verify(session).sendMessage(messageCaptor.capture());

    assertEquals("BYE", messageCaptor.getValue().getPayload().toString());
    verify(session, times(1)).close();
}
 
源代码30 项目: paas   文件: ContainerExecWSHandler.java
/**
 * 处理输入内容
 * @author jitwxs
 * @since 2018/7/1 14:22
 */
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    String containerId = session.getAttributes().get("containerId").toString();
    ExecSession execSession = execSessionMap.get(containerId);
    OutputStream out = execSession.getSocket().getOutputStream();
    out.write(message.asBytes());
    out.flush();
}
 
 类方法
 同包方法