下面列出了怎么用org.springframework.messaging.simp.stomp.StompCommand的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void handleMessageToClientWithSimpDisconnectAck() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
"\n\u0000", actual.getPayload());
}
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
context.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
@Test
public void brokerChannelUsedByAnnotatedMethod() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
context.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setDestination("/foo");
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/bar", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
private Message<byte[]> createDisconnectMessage(WebSocketSession session) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
headerAccessor.setSessionId(session.getId());
headerAccessor.setSessionAttributes(session.getAttributes());
Principal user = getUser(session);
if (user != null) {
headerAccessor.setUser(user);
}
return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
}
@Override
@Nullable
public Message<byte[]> handleClientMessageProcessingError(@Nullable Message<byte[]> clientMessage, Throwable ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setMessage(ex.getMessage());
accessor.setLeaveMutable(true);
StompHeaderAccessor clientHeaderAccessor = null;
if (clientMessage != null) {
clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
if (clientHeaderAccessor != null) {
String receiptId = clientHeaderAccessor.getReceipt();
if (receiptId != null) {
accessor.setReceiptId(receiptId);
}
}
}
return handleInternal(accessor, EMPTY_PAYLOAD, ex, clientHeaderAccessor);
}
@Test
public void connectReceiveAndCloseWithStompFrame() throws Exception {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/destination");
MessageHeaders headers = accessor.getMessageHeaders();
Message<byte[]> message = MessageBuilder.createMessage("body".getBytes(StandardCharsets.UTF_8), headers);
byte[] bytes = new StompEncoder().encode(message);
TextMessage textMessage = new TextMessage(bytes);
SockJsFrame frame = SockJsFrame.messageFrame(new Jackson2SockJsMessageCodec(), textMessage.getPayload());
String body = "o\n" + frame.getContent() + "\n" + "c[3000,\"Go away!\"]";
ClientHttpResponse response = response(HttpStatus.OK, body);
connect(response);
verify(this.webSocketHandler).afterConnectionEstablished(any());
verify(this.webSocketHandler).handleMessage(any(), eq(textMessage));
verify(this.webSocketHandler).afterConnectionClosed(any(), eq(new CloseStatus(3000, "Go away!")));
verifyNoMoreInteractions(this.webSocketHandler);
}
@Test
public void handleMessageToClientWithSimpConnectAck() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0,1.1,1.2");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
ackAccessor.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, new long[] {15000, 15000});
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.2\n" + "heart-beat:15000,15000\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
public void handleMessageToClientWithUserDestination() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
headers.setMessageId("mess0");
headers.setSubscriptionId("sub0");
headers.setDestination("/queue/foo-user123");
headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new TextMessage(text));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test // SPR-14690
public void handleMessageFromClientWithTokenAuthentication() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
channel.addInterceptor(new AuthenticationInterceptor("[email protected]"));
channel.addInterceptor(new ImmutableMessageChannelInterceptor());
TestMessageHandler messageHandler = new TestMessageHandler();
channel.subscribe(messageHandler);
StompSubProtocolHandler handler = new StompSubProtocolHandler();
handler.afterSessionStarted(this.session, channel);
TextMessage wsMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).build();
handler.handleMessageFromClient(this.session, wsMessage, channel);
assertEquals(1, messageHandler.getMessages().size());
Message<?> message = messageHandler.getMessages().get(0);
Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
assertNotNull(user);
assertEquals("[email protected]", user.getName());
}
@Test
public void handleMessageToClientWithUserDestination() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
headers.setMessageId("mess0");
headers.setSubscriptionId("sub0");
headers.setDestination("/queue/foo-user123");
headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
@Test // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
}
finally {
session.close();
}
}
@Test // SPR-11648
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
String destHeader = "destination:/app/number";
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
}
finally {
session.close();
}
}
@Test
public void webSocketScope() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/scopedBeanValue").build();
TextMessage m2 = create(StompCommand.SEND)
.headers("destination:/app/scopedBeanValue").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
assertTrue(payload.endsWith("55\0"));
}
finally {
session.close();
}
}
@Test
public void handleExceptionAndSendToUser() throws Exception {
String destHeader = "destination:/user/queue/error";
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/app/exception").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/user/queue/error\n"));
assertTrue(payload.endsWith("Got error: Bad input\0"));
}
finally {
session.close();
}
}
@Test
public void handleMessageToClientWithHeartbeatSuppressingSockJsHeartbeat() throws IOException {
SockJsSession sockJsSession = Mockito.mock(SockJsSession.class);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setHeartbeat(0, 10);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(sockJsSession, message);
verify(sockJsSession).getPrincipal();
verify(sockJsSession).disableHeartbeat();
verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
verifyNoMoreInteractions(sockJsSession);
sockJsSession = Mockito.mock(SockJsSession.class);
accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setHeartbeat(0, 0);
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(sockJsSession, message);
verify(sockJsSession).getPrincipal();
verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
verifyNoMoreInteractions(sockJsSession);
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
public void sendWebSocketBinary() throws Exception {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/b");
accessor.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
ArgumentCaptor<BinaryMessage> binaryMessageCaptor = ArgumentCaptor.forClass(BinaryMessage.class);
verify(this.webSocketSession).sendMessage(binaryMessageCaptor.capture());
BinaryMessage binaryMessage = binaryMessageCaptor.getValue();
assertNotNull(binaryMessage);
assertEquals("SEND\ndestination:/b\ncontent-type:application/octet-stream\ncontent-length:7\n\npayload\0",
new String(binaryMessage.getPayload().array(), StandardCharsets.UTF_8));
}
@Test
public void clientInboundChannelSendMessage() throws Exception {
ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);
List<ChannelInterceptor> interceptors = channel.getInterceptors();
assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());
TestWebSocketSession session = new TestWebSocketSession("s1");
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
webSocketHandler.handleMessage(session, textMessage);
Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
assertFalse(accessor.isMutable());
assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
assertEquals("/foo", accessor.getDestination());
}
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0,1.1");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.1\n" + "heart-beat:0,0\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
public String encodeSubscribe(TopicContent topic, Long id) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setDestination(topic.getTopicName());
accessor.setNativeHeader("eventId", topic.getOffset());
accessor.setNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER, Long.toString(id));
if (!StringUtils.isBlank(topic.getGroupId())) {
accessor.setNativeHeader("groupId", topic.getGroupId());
}
// keep the original SubscriptionId
if (topic.getExtension().containsKey(WeEvent.WeEvent_SubscriptionId)) {
accessor.setNativeHeader(WeEvent.WeEvent_SubscriptionId, topic.getExtension().get(WeEvent.WeEvent_SubscriptionId));
}
if (topic.getExtension().containsKey(WeEvent.WeEvent_TAG)) {
accessor.setNativeHeader(WeEvent.WeEvent_TAG, topic.getExtension().get(WeEvent.WeEvent_TAG));
}
if (topic.getExtension().containsKey(WeEvent.WeEvent_EPHEMERAL)) {
accessor.setNativeHeader(WeEvent.WeEvent_EPHEMERAL, topic.getExtension().get(WeEvent.WeEvent_EPHEMERAL));
}
return encodeRaw(accessor);
}
@Test // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(1).getPayload();
assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
}
finally {
session.close();
}
}
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
@Test
public void webSocketScope() throws Exception {
TextMessage message1 = create(StompCommand.SUBSCRIBE)
.headers("id:subs1", "destination:/topic/scopedBeanValue").build();
TextMessage message2 = create(StompCommand.SEND)
.headers("destination:/app/scopedBeanValue").build();
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
WebSocketSession session = doHandshake(clientHandler, "/ws").get();
try {
assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
String payload = clientHandler.actual.get(0).getPayload();
assertTrue(payload.startsWith("MESSAGE\n"));
assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
assertTrue(payload.endsWith("55\0"));
}
finally {
session.close();
}
}
private void handleUnsubscribeMessage(StompHeaderAccessor stompHeaderAccessor, WebSocketSession session) {
String simpDestination = stompHeaderAccessor.getDestination();
String headerIdStr = stompHeaderAccessor.getFirstNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER);
try {
boolean result = handleUnSubscribe(session, headerIdStr);
// send response
StompHeaderAccessor accessor;
if (result) {
accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setDestination(simpDestination);
} else {
accessor = StompHeaderAccessor.create(StompCommand.ERROR);
}
// a unique identifier for that message and a subscription header matching the identifier of the subscription that is receiving the message.
accessor.setReceiptId(headerIdStr);
sendSimpleMessage(session, accessor);
} catch (BrokerException e) {
handleErrorMessage(session, e, headerIdStr);
}
}
@Test
public void sendWebSocketBinary() throws Exception {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/b");
accessor.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
ArgumentCaptor<BinaryMessage> binaryMessageCaptor = ArgumentCaptor.forClass(BinaryMessage.class);
verify(this.webSocketSession).sendMessage(binaryMessageCaptor.capture());
BinaryMessage binaryMessage = binaryMessageCaptor.getValue();
assertNotNull(binaryMessage);
assertEquals("SEND\ndestination:/b\ncontent-type:application/octet-stream\ncontent-length:7\n\npayload\0",
new String(binaryMessage.getPayload().array(), StandardCharsets.UTF_8));
}
/**
* 绑定用户信息
*
* @param message
* @param channel
* @return
*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
log.debug("进入拦截器 -> preSend");
StompHeaderAccessor stompHeaderAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(stompHeaderAccessor.getCommand())) {
User user = new User();
user.setUserId(UUIDUtils.create());
user.setUsername(SensitiveWordUtils.loveChina(stompHeaderAccessor.getFirstNativeHeader("username")));
user.setAvatar(stompHeaderAccessor.getFirstNativeHeader("avatar"));
user.setAddress(stompHeaderAccessor.getFirstNativeHeader("address"));
user.setStatus(UserStatusConstant.ONLINE);
stompHeaderAccessor.setUser(user);
log.debug("绑定用户信息 -> {}", user);
}
return message;
}
@Test
public void handleMessageFromBrokerWithoutActiveSession() {
this.handler.setBroadcastDestination("/topic/unresolved");
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setSessionId("system123");
accessor.setDestination("/topic/unresolved");
accessor.setNativeHeader(ORIGINAL_DESTINATION, "/user/joe/queue/foo");
accessor.setLeaveMutable(true);
byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
this.handler.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
// No re-broadcast
verifyNoMoreInteractions(this.brokerChannel);
}