下面列出了怎么用org.springframework.web.socket.TextMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void handleTextMessage() throws Exception {
Message<String> message = new Message<>(0, 1L, 1, 1L, "a");
TextMessage textMessage = new TextMessage((new ObjectMapper()).writeValueAsString(message));
String refUtf8 = new String(textMessage.getPayload().getBytes(), UTF_8);
String refUtf16 = new String(textMessage.getPayload().getBytes(), UTF_16);
underTest.afterConnectionEstablished(session);
underTest.handleTextMessage(session, textMessage);
ArgumentCaptor<String> valueCapture = ArgumentCaptor.forClass(String.class);
verify(service).onEvent(valueCapture.capture());
String out = valueCapture.getValue();
assertEquals(refUtf8, out);
assertNotEquals(refUtf16, out);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.error("Error in session {}: {}", session.getId(), exception.getMessage());
if (exception.getMessage().contains("Connection reset by peer")) {
afterConnectionClosed(session, CloseStatus.SESSION_NOT_RELIABLE);
return;
}
JsonMessageBuilder builder;
session = sessionMonitor.getSession(session.getId());
if (exception instanceof JsonParseException) {
builder = JsonMessageBuilder
.createErrorResponseBuilder(HttpServletResponse.SC_BAD_REQUEST, "Incorrect JSON syntax");
} else {
builder = JsonMessageBuilder
.createErrorResponseBuilder(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Internal server error");
}
try {
session.sendMessage(new TextMessage(GsonFactory.createGson().toJson(builder.build())));
} catch (ClosedChannelException closedChannelException) {
logger.error("WebSocket error: Channel is closed");
}
}
@Test
public void connectReceiveAndCloseWithStompFrame() throws Exception {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/destination");
MessageHeaders headers = accessor.getMessageHeaders();
Message<byte[]> message = MessageBuilder.createMessage("body".getBytes(StandardCharsets.UTF_8), headers);
byte[] bytes = new StompEncoder().encode(message);
TextMessage textMessage = new TextMessage(bytes);
SockJsFrame frame = SockJsFrame.messageFrame(new Jackson2SockJsMessageCodec(), textMessage.getPayload());
String body = "o\n" + frame.getContent() + "\n" + "c[3000,\"Go away!\"]";
ClientHttpResponse response = response(HttpStatus.OK, body);
connect(response);
verify(this.webSocketHandler).afterConnectionEstablished(any());
verify(this.webSocketHandler).handleMessage(any(), eq(textMessage));
verify(this.webSocketHandler).afterConnectionClosed(any(), eq(new CloseStatus(3000, "Go away!")));
verifyNoMoreInteractions(this.webSocketHandler);
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
if (!(message instanceof TextMessage)) {
throw new IllegalArgumentException(this + " supports text messages only.");
}
if (this.state != State.OPEN) {
throw new IllegalStateException(this + " is not open: current state " + this.state);
}
String payload = ((TextMessage) message).getPayload();
payload = getMessageCodec().encode(payload);
payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
TextMessage messageToSend = new TextMessage(payload);
if (logger.isTraceEnabled()) {
logger.trace("Sending message " + messageToSend + " in " + this);
}
sendInternal(messageToSend);
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
checkNativeSessionInitialized();
if (logger.isTraceEnabled()) {
logger.trace("Sending " + message + ", " + this);
}
if (message instanceof TextMessage) {
sendTextMessage((TextMessage) message);
}
else if (message instanceof BinaryMessage) {
sendBinaryMessage((BinaryMessage) message);
}
else if (message instanceof PingMessage) {
sendPingMessage((PingMessage) message);
}
else if (message instanceof PongMessage) {
sendPongMessage((PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocketMessage type: " + message);
}
}
@Test
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler);
TestSockJsSession sockJsSession = new TestSockJsSession(
"1", this.sockJsConfig, wsHandler, Collections.<String, Object>emptyMap());
String msg1 = "message 1";
String msg2 = "message 2";
String msg3 = "message 3";
willThrow(new IOException()).given(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
sockJsSession.delegateConnectionEstablished();
try {
sockJsSession.delegateMessages(msg1, msg2, msg3);
fail("expected exception");
}
catch (SockJsMessageDeliveryException ex) {
assertEquals(Collections.singletonList(msg3), ex.getUndeliveredMessages());
verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession);
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1));
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR);
verifyNoMoreInteractions(this.webSocketHandler);
}
}
@Override
public void executeSendRequest(URI url, HttpHeaders headers, TextMessage message) {
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR send, url=" + url);
}
ResponseEntity<String> response = executeSendRequestInternal(url, headers, message);
if (response.getStatusCode() != HttpStatus.NO_CONTENT) {
if (logger.isErrorEnabled()) {
logger.error("XHR send request (url=" + url + ") failed: " + response);
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isTraceEnabled()) {
logger.trace("XHR send request (url=" + url + ") response: " + response);
}
}
@Test // SPR-11648
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
String destHeader = "destination:/app/number";
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
}
finally {
session.close();
}
}
@Test // SPR-17140
public void overflowStrategyDrop() throws IOException, InterruptedException {
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP);
sendBlockingMessage(decorator);
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < 1023; i++) {
sb.append("a");
}
for (int i=0; i < 5; i++) {
TextMessage message = new TextMessage(sb.toString());
decorator.sendMessage(message);
}
assertEquals(1023, decorator.getBufferSize());
assertTrue(session.isOpen());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception {
try {
GameMessage message = getMessage(textMessage);
switch(message.getAction()) {
case INITIALIZE: initialize(message, session); break;
case JOIN: join(message.getGameId(), message.getPlayerName(), session); break;
case LEAVE: leave(session.getId()); break;
case START: startGame(message); break;
case ANSWER: answer(message, session.getId()); break;
case JOIN_RANDOM: joinRandomGame(message.getPlayerName(), session); break;
}
} catch (Exception ex) {
logger.error("Exception occurred while handling message", ex);
}
}
@Test
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/increment").build();
TextMessage m2 = create(StompCommand.SEND)
.headers("destination:/app/increment").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
}
finally {
session.close();
}
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void handleWebSocketMessageSplitAcrossTwoMessage() throws Exception {
WebSocketHandler webSocketHandler = connect();
String part1 = "SEND\na:alpha\n\nMessage";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part1));
verifyNoMoreInteractions(this.stompSession);
String part2 = " payload\0";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part2));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), UTF_8));
}
@Test // SPR-17140
public void overflowStrategyDrop() throws IOException, InterruptedException {
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP);
sendBlockingMessage(decorator);
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < 1023; i++) {
sb.append("a");
}
for (int i=0; i < 5; i++) {
TextMessage message = new TextMessage(sb.toString());
decorator.sendMessage(message);
}
assertEquals(1023, decorator.getBufferSize());
assertTrue(session.isOpen());
}
protected void sendHeartbeat() {
try {
if (_sessionContext.get() == null) {
return;
}
long heartbeatPrepareStartTime = System.currentTimeMillis();
final TextMessage message = _instanceRepository.getHeartbeatMessage();
_heartbeatPrepareLatency.addValue(System.currentTimeMillis() - heartbeatPrepareStartTime);
if (message == null) {
_logger.info("heartbeat message is null");
_lastHeartbeatTime = System.currentTimeMillis();
return;
}
long heartbeatSendStartTime = System.currentTimeMillis();
_sessionContext.get().sendMessage(message);
_heartbeatSendLatency.addValue(System.currentTimeMillis() - heartbeatSendStartTime);
_lastHeartbeatTime = System.currentTimeMillis();
_heartbeatAcceptStartTime = System.currentTimeMillis();
} catch (Throwable e) {
_logger.warn("send heartbeat failed.", e);
}
}
@Test
public void sendMessageToControllerAndReceiveReplyViaTopic() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/increment").build();
TextMessage message2 = create(StompCommand.SEND)
.headers("destination:/app/increment").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
}
finally {
session.close();
}
}
/**
* 发送消息给指定的用户
*/
public void sendMessageToUser(String userId, TextMessage message) {
for (WebSocketSession user : users) {
System.out.println(user.getAttributes().get("accountId")+"========发送消息给指定的用户========="+userId);
if (user.getAttributes().get("accountId").equals(userId)) {
try {
// isOpen()在线就发送
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Test // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
}
finally {
session.close();
}
}
@Test
public void handleMessageToClientWithSimpDisconnectAck() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
"\n\u0000", actual.getPayload());
}
public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
String payload = message.getPayload();
if (StringUtils.isEmpty(payload)) {
return;
}
String[] messages;
try {
messages = getSockJsServiceConfig().getMessageCodec().decode(payload);
}
catch (Throwable ex) {
logger.error("Broken data received. Terminating WebSocket connection abruptly", ex);
tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA);
return;
}
delegateMessages(messages);
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new TextMessage(text));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
@Test
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
}
finally {
session.close();
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
try {
getClient(session, null);
if (this.terminalClient != null) {
if (!terminalClient.isClosed()) {
terminalClient.write(message.getPayload());
} else {
session.close();
}
}
} catch (Exception e) {
session.sendMessage(new TextMessage("Sorry! jobx Terminal was closed, please try again. "));
terminalClient.disconnect();
session.close();
}
}
private void send(ObjectNode msg) {
try {
session.sendMessage(new TextMessage(msg.toString()));
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void handleMessageBadData() throws Exception {
TextMessage message = new TextMessage("[\"x]");
this.session.handleMessage(message, this.webSocketSession);
this.session.isClosed();
verify(this.webSocketHandler).handleTransportError(same(this.session), any(IOException.class));
verifyNoMoreInteractions(this.webSocketHandler);
}
private void handleOnEvent(String headerIdStr,
String subscriptionId,
WeEvent event,
WebSocketSession session) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
// package the return frame
accessor.setSubscriptionId(headerIdStr);
accessor.setNativeHeader("subscription-id", subscriptionId);
accessor.setMessageId(headerIdStr);
accessor.setDestination(event.getTopic());
accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8));
// set custom properties in header
for (Map.Entry<String, String> custom : event.getExtensions().entrySet()) {
accessor.setNativeHeader(custom.getKey(), custom.getValue());
}
// set eventId in header
accessor.setNativeHeader(WeEventConstants.EXTENSIONS_EVENT_ID, event.getEventId());
// payload == content
MessageHeaders headers = accessor.getMessageHeaders();
Message<byte[]> message = MessageBuilder.createMessage(event.getContent(), headers);
byte[] bytes = new StompEncoder().encode(message);
// send to remote
send2Remote(session, new TextMessage(bytes));
}
public void sendInfo(String info){
for (WebSocketSession session : this.sessions.values()) {
try {
this.semaphore.acquire();
session.sendMessage(new TextMessage(info));
this.semaphore.release();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
public void handleFrameMessageWithWebSocketHandlerException() throws Exception {
this.session.handleFrame(SockJsFrame.openFrame().getContent());
willThrow(new IllegalStateException("Fake error")).given(this.handler)
.handleMessage(this.session, new TextMessage("foo"));
willThrow(new IllegalStateException("Fake error")).given(this.handler)
.handleMessage(this.session, new TextMessage("bar"));
this.session.handleFrame(SockJsFrame.messageFrame(CODEC, "foo", "bar").getContent());
assertThat(this.session.isOpen(), equalTo(true));
verify(this.handler).afterConnectionEstablished(this.session);
verify(this.handler).handleMessage(this.session, new TextMessage("foo"));
verify(this.handler).handleMessage(this.session, new TextMessage("bar"));
verifyNoMoreInteractions(this.handler);
}
@Test
public void handleByeMessage() throws Exception {
WebSocketServerHandler handler = new WebSocketServerHandler();
WebSocketSession session = mock(WebSocketSession.class);
handler.handleMessage(session, new TextMessage("BYE"));
ArgumentCaptor<WebSocketMessage<?>> messageCaptor = ArgumentCaptor.forClass(WebSocketMessage.class);
verify(session).sendMessage(messageCaptor.capture());
assertEquals("BYE", messageCaptor.getValue().getPayload().toString());
verify(session, times(1)).close();
}
/**
* 处理输入内容
* @author jitwxs
* @since 2018/7/1 14:22
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String containerId = session.getAttributes().get("containerId").toString();
ExecSession execSession = execSessionMap.get(containerId);
OutputStream out = execSession.getSocket().getOutputStream();
out.write(message.asBytes());
out.flush();
}