类org.springframework.messaging.simp.stomp.StompCommand源码实例Demo

下面列出了怎么用org.springframework.messaging.simp.stomp.StompCommand的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void handleMessageToClientWithSimpDisconnectAck() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
			"\n\u0000", actual.getPayload());
}
 
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
	ApplicationContext context = loadConfig(SimpleBrokerConfig.class);

	TestChannel channel = context.getBean("clientOutboundChannel", TestChannel.class);
	SimpAnnotationMethodMessageHandler messageHandler =
			context.getBean(SimpAnnotationMethodMessageHandler.class);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
	headers.setSessionId("sess1");
	headers.setSessionAttributes(new ConcurrentHashMap<>());
	headers.setSubscriptionId("subs1");
	headers.setDestination("/foo");
	Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();

	messageHandler.handleMessage(message);

	message = channel.messages.get(0);
	headers = StompHeaderAccessor.wrap(message);

	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("/foo", headers.getDestination());
	assertEquals("bar", new String((byte[]) message.getPayload()));
}
 
@Test
public void brokerChannelUsedByAnnotatedMethod() {
	ApplicationContext context = loadConfig(SimpleBrokerConfig.class);

	TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
	SimpAnnotationMethodMessageHandler messageHandler =
			context.getBean(SimpAnnotationMethodMessageHandler.class);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
	headers.setSessionId("sess1");
	headers.setSessionAttributes(new ConcurrentHashMap<>());
	headers.setDestination("/foo");
	Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());

	messageHandler.handleMessage(message);

	message = channel.messages.get(0);
	headers = StompHeaderAccessor.wrap(message);

	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("/bar", headers.getDestination());
	assertEquals("bar", new String((byte[]) message.getPayload()));
}
 
private Message<byte[]> createDisconnectMessage(WebSocketSession session) {
	StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
	if (getHeaderInitializer() != null) {
		getHeaderInitializer().initHeaders(headerAccessor);
	}

	headerAccessor.setSessionId(session.getId());
	headerAccessor.setSessionAttributes(session.getAttributes());

	Principal user = getUser(session);
	if (user != null) {
		headerAccessor.setUser(user);
	}

	return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
}
 
@Override
@Nullable
public Message<byte[]> handleClientMessageProcessingError(@Nullable Message<byte[]> clientMessage, Throwable ex) {
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
	accessor.setMessage(ex.getMessage());
	accessor.setLeaveMutable(true);

	StompHeaderAccessor clientHeaderAccessor = null;
	if (clientMessage != null) {
		clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
		if (clientHeaderAccessor != null) {
			String receiptId = clientHeaderAccessor.getReceipt();
			if (receiptId != null) {
				accessor.setReceiptId(receiptId);
			}
		}
	}

	return handleInternal(accessor, EMPTY_PAYLOAD, ex, clientHeaderAccessor);
}
 
@Test
public void connectReceiveAndCloseWithStompFrame() throws Exception {
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
	accessor.setDestination("/destination");
	MessageHeaders headers = accessor.getMessageHeaders();
	Message<byte[]> message = MessageBuilder.createMessage("body".getBytes(StandardCharsets.UTF_8), headers);
	byte[] bytes = new StompEncoder().encode(message);
	TextMessage textMessage = new TextMessage(bytes);
	SockJsFrame frame = SockJsFrame.messageFrame(new Jackson2SockJsMessageCodec(), textMessage.getPayload());

	String body = "o\n" + frame.getContent() + "\n" + "c[3000,\"Go away!\"]";
	ClientHttpResponse response = response(HttpStatus.OK, body);
	connect(response);

	verify(this.webSocketHandler).afterConnectionEstablished(any());
	verify(this.webSocketHandler).handleMessage(any(), eq(textMessage));
	verify(this.webSocketHandler).afterConnectionClosed(any(), eq(new CloseStatus(3000, "Go away!")));
	verifyNoMoreInteractions(this.webSocketHandler);
}
 
@Test
public void handleMessageToClientWithSimpConnectAck() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0,1.1,1.2");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, new long[] {15000, 15000});
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.2\n" + "heart-beat:15000,15000\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
	String text = "SEND\na:alpha\n\nMessage payload\0";
	connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(this.stompSession).handleMessage(captor.capture());
	Message<byte[]> message = captor.getValue();
	assertNotNull(message);

	StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
	assertEquals(StompCommand.SEND, accessor.getCommand());
	assertEquals("alpha", headers.getFirst("a"));
	assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
 
@Test
public void handleMessageToClientWithUserDestination() {

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
	headers.setMessageId("mess0");
	headers.setSubscriptionId("sub0");
	headers.setDestination("/queue/foo-user123");
	headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
	Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, message);

	assertEquals(1, this.session.getSentMessages().size());
	WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
	assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
	assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
 
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
	String text = "SEND\na:alpha\n\nMessage payload\0";
	connect().handleMessage(this.webSocketSession, new TextMessage(text));

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(this.stompSession).handleMessage(captor.capture());
	Message<byte[]> message = captor.getValue();
	assertNotNull(message);

	StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
	assertEquals(StompCommand.SEND, accessor.getCommand());
	assertEquals("alpha", headers.getFirst("a"));
	assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
 
@Test // SPR-14690
public void handleMessageFromClientWithTokenAuthentication() {
	ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
	channel.addInterceptor(new AuthenticationInterceptor("[email protected]"));
	channel.addInterceptor(new ImmutableMessageChannelInterceptor());

	TestMessageHandler messageHandler = new TestMessageHandler();
	channel.subscribe(messageHandler);

	StompSubProtocolHandler handler = new StompSubProtocolHandler();
	handler.afterSessionStarted(this.session, channel);

	TextMessage wsMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).build();
	handler.handleMessageFromClient(this.session, wsMessage, channel);

	assertEquals(1, messageHandler.getMessages().size());
	Message<?> message = messageHandler.getMessages().get(0);
	Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
	assertNotNull(user);
	assertEquals("[email protected]", user.getName());
}
 
@Test
public void handleMessageToClientWithUserDestination() {

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
	headers.setMessageId("mess0");
	headers.setSubscriptionId("sub0");
	headers.setDestination("/queue/foo-user123");
	headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
	Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, message);

	assertEquals(1, this.session.getSentMessages().size());
	WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
	assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
	assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
 
@Test  // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
	TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));

		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
	}
	finally {
		session.close();
	}
}
 
@Test  // SPR-11648
public void sendSubscribeToControllerAndReceiveReply() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	String destHeader = "destination:/app/number";
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue("Expected STOMP destination=/app/number, got " + payload, payload.contains(destHeader));
		assertTrue("Expected STOMP Payload=42, got " + payload, payload.contains("42"));
	}
	finally {
		session.close();
	}
}
 
@Test
public void webSocketScope() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	TextMessage m1 = create(StompCommand.SUBSCRIBE)
			.headers("id:subs1", "destination:/topic/scopedBeanValue").build();
	TextMessage m2 = create(StompCommand.SEND)
			.headers("destination:/app/scopedBeanValue").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue(payload.startsWith("MESSAGE\n"));
		assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
		assertTrue(payload.endsWith("55\0"));
	}
	finally {
		session.close();
	}
}
 
@Test
public void handleExceptionAndSendToUser() throws Exception {
	String destHeader = "destination:/user/queue/error";
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destHeader).build();
	TextMessage m2 = create(StompCommand.SEND).headers("destination:/app/exception").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
		String payload = clientHandler.actual.get(0).getPayload();
		assertTrue(payload.startsWith("MESSAGE\n"));
		assertTrue(payload.contains("destination:/user/queue/error\n"));
		assertTrue(payload.endsWith("Got error: Bad input\0"));
	}
	finally {
		session.close();
	}
}
 
@Test
public void handleMessageToClientWithHeartbeatSuppressingSockJsHeartbeat() throws IOException {

	SockJsSession sockJsSession = Mockito.mock(SockJsSession.class);
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
	accessor.setHeartbeat(0, 10);
	Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(sockJsSession, message);

	verify(sockJsSession).getPrincipal();
	verify(sockJsSession).disableHeartbeat();
	verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
	verifyNoMoreInteractions(sockJsSession);

	sockJsSession = Mockito.mock(SockJsSession.class);
	accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
	accessor.setHeartbeat(0, 0);
	message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(sockJsSession, message);

	verify(sockJsSession).getPrincipal();
	verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
	verifyNoMoreInteractions(sockJsSession);
}
 
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
	String text = "SEND\na:alpha\n\nMessage payload\0";
	connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(this.stompSession).handleMessage(captor.capture());
	Message<byte[]> message = captor.getValue();
	assertNotNull(message);

	StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
	assertEquals(StompCommand.SEND, accessor.getCommand());
	assertEquals("alpha", headers.getFirst("a"));
	assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
 
@Test
public void sendWebSocketBinary() throws Exception {
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
	accessor.setDestination("/b");
	accessor.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
	byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);

	getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));

	ArgumentCaptor<BinaryMessage> binaryMessageCaptor = ArgumentCaptor.forClass(BinaryMessage.class);
	verify(this.webSocketSession).sendMessage(binaryMessageCaptor.capture());
	BinaryMessage binaryMessage = binaryMessageCaptor.getValue();
	assertNotNull(binaryMessage);
	assertEquals("SEND\ndestination:/b\ncontent-type:application/octet-stream\ncontent-length:7\n\npayload\0",
			new String(binaryMessage.getPayload().array(), StandardCharsets.UTF_8));
}
 
@Test
public void clientInboundChannelSendMessage() throws Exception {
	ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
	TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
	SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);

	List<ChannelInterceptor> interceptors = channel.getInterceptors();
	assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());

	TestWebSocketSession session = new TestWebSocketSession("s1");
	session.setOpen(true);
	webSocketHandler.afterConnectionEstablished(session);

	TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
	webSocketHandler.handleMessage(session, textMessage);

	Message<?> message = channel.messages.get(0);
	StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	assertNotNull(accessor);
	assertFalse(accessor.isMutable());
	assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
	assertEquals("/foo", accessor.getDestination());
}
 
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0,1.1");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.1\n" + "heart-beat:0,0\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
源代码23 项目: WeEvent   文件: WeEventStompCommand.java
public String encodeSubscribe(TopicContent topic, Long id) {
    StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
    accessor.setDestination(topic.getTopicName());
    accessor.setNativeHeader("eventId", topic.getOffset());

    accessor.setNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER, Long.toString(id));
    if (!StringUtils.isBlank(topic.getGroupId())) {
        accessor.setNativeHeader("groupId", topic.getGroupId());
    }

    // keep the original SubscriptionId
    if (topic.getExtension().containsKey(WeEvent.WeEvent_SubscriptionId)) {
        accessor.setNativeHeader(WeEvent.WeEvent_SubscriptionId, topic.getExtension().get(WeEvent.WeEvent_SubscriptionId));
    }
    if (topic.getExtension().containsKey(WeEvent.WeEvent_TAG)) {
        accessor.setNativeHeader(WeEvent.WeEvent_TAG, topic.getExtension().get(WeEvent.WeEvent_TAG));
    }
    if (topic.getExtension().containsKey(WeEvent.WeEvent_EPHEMERAL)) {
        accessor.setNativeHeader(WeEvent.WeEvent_EPHEMERAL, topic.getExtension().get(WeEvent.WeEvent_EPHEMERAL));
    }
    return encodeRaw(accessor);
}
 
@Test  // SPR-10930
public void sendMessageToBrokerAndReceiveReplyViaTopic() throws Exception {
	TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
	TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
	TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));

		String payload = clientHandler.actual.get(1).getPayload();
		assertTrue("Expected STOMP MESSAGE, got " + payload, payload.startsWith("MESSAGE\n"));
	}
	finally {
		session.close();
	}
}
 
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
	TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
	SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
	headers.setSessionId("sess1");
	headers.setSessionAttributes(new ConcurrentHashMap<>());
	headers.setSubscriptionId("subs1");
	headers.setDestination("/foo");
	Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();

	messageHandler.handleMessage(message);

	message = channel.messages.get(0);
	headers = StompHeaderAccessor.wrap(message);

	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("/foo", headers.getDestination());
	assertEquals("bar", new String((byte[]) message.getPayload()));
}
 
@Test
public void webSocketScope() throws Exception {
	TextMessage message1 = create(StompCommand.SUBSCRIBE)
			.headers("id:subs1", "destination:/topic/scopedBeanValue").build();
	TextMessage message2 = create(StompCommand.SEND)
			.headers("destination:/app/scopedBeanValue").build();

	TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(1, message1, message2);
	WebSocketSession session = doHandshake(clientHandler, "/ws").get();

	try {
		assertTrue(clientHandler.latch.await(TIMEOUT, TimeUnit.SECONDS));
		String payload = clientHandler.actual.get(0).getPayload();
		assertTrue(payload.startsWith("MESSAGE\n"));
		assertTrue(payload.contains("destination:/topic/scopedBeanValue\n"));
		assertTrue(payload.endsWith("55\0"));
	}
	finally {
		session.close();
	}
}
 
源代码27 项目: WeEvent   文件: BrokerStomp.java
private void handleUnsubscribeMessage(StompHeaderAccessor stompHeaderAccessor, WebSocketSession session) {
    String simpDestination = stompHeaderAccessor.getDestination();
    String headerIdStr = stompHeaderAccessor.getFirstNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER);

    try {
        boolean result = handleUnSubscribe(session, headerIdStr);

        // send response
        StompHeaderAccessor accessor;
        if (result) {
            accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
            accessor.setDestination(simpDestination);
        } else {
            accessor = StompHeaderAccessor.create(StompCommand.ERROR);
        }
        // a unique identifier for that message and a subscription header matching the identifier of the subscription that is receiving the message.
        accessor.setReceiptId(headerIdStr);
        sendSimpleMessage(session, accessor);
    } catch (BrokerException e) {
        handleErrorMessage(session, e, headerIdStr);
    }
}
 
@Test
public void sendWebSocketBinary() throws Exception {
	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
	accessor.setDestination("/b");
	accessor.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
	byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);

	getTcpConnection().send(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));

	ArgumentCaptor<BinaryMessage> binaryMessageCaptor = ArgumentCaptor.forClass(BinaryMessage.class);
	verify(this.webSocketSession).sendMessage(binaryMessageCaptor.capture());
	BinaryMessage binaryMessage = binaryMessageCaptor.getValue();
	assertNotNull(binaryMessage);
	assertEquals("SEND\ndestination:/b\ncontent-type:application/octet-stream\ncontent-length:7\n\npayload\0",
			new String(binaryMessage.getPayload().array(), StandardCharsets.UTF_8));
}
 
源代码29 项目: xechat   文件: WebSocketInterceptor.java
/**
 * 绑定用户信息
 *
 * @param message
 * @param channel
 * @return
 */
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
    log.debug("进入拦截器 -> preSend");
    StompHeaderAccessor stompHeaderAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

    if (StompCommand.CONNECT.equals(stompHeaderAccessor.getCommand())) {
        User user = new User();
        user.setUserId(UUIDUtils.create());
        user.setUsername(SensitiveWordUtils.loveChina(stompHeaderAccessor.getFirstNativeHeader("username")));
        user.setAvatar(stompHeaderAccessor.getFirstNativeHeader("avatar"));
        user.setAddress(stompHeaderAccessor.getFirstNativeHeader("address"));
        user.setStatus(UserStatusConstant.ONLINE);

        stompHeaderAccessor.setUser(user);
        log.debug("绑定用户信息 -> {}", user);
    }

    return message;
}
 
@Test
public void handleMessageFromBrokerWithoutActiveSession() {
	this.handler.setBroadcastDestination("/topic/unresolved");
	given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
	accessor.setSessionId("system123");
	accessor.setDestination("/topic/unresolved");
	accessor.setNativeHeader(ORIGINAL_DESTINATION, "/user/joe/queue/foo");
	accessor.setLeaveMutable(true);
	byte[] payload = "payload".getBytes(StandardCharsets.UTF_8);
	this.handler.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));

	// No re-broadcast
	verifyNoMoreInteractions(this.brokerChannel);
}
 
 类方法
 同包方法