类org.springframework.messaging.simp.SimpMessageType源码实例Demo

下面列出了怎么用org.springframework.messaging.simp.SimpMessageType的API类实例代码及写法,或者点击链接到github查看源代码。

public Message<?> preHandle(Message<?> message) throws MessagingException {
	String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
	if (!getBroadcastDestination().equals(destination)) {
		return message;
	}
	SimpMessageHeaderAccessor accessor = getAccessor(message, SimpMessageHeaderAccessor.class);
	if (accessor.getSessionId() == null) {
		// Our own broadcast
		return null;
	}
	destination = accessor.getFirstNativeHeader(ORIGINAL_DESTINATION);
	if (logger.isTraceEnabled()) {
		logger.trace("Checking unresolved user destination: " + destination);
	}
	SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
	for (String name : accessor.toNativeHeaderMap().keySet()) {
		if (NO_COPY_LIST.contains(name)) {
			continue;
		}
		newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
	}
	newAccessor.setDestination(destination);
	newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
	return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
}
 
@Nullable
private ParseResult parse(Message<?> message) {
	MessageHeaders headers = message.getHeaders();
	String sourceDestination = SimpMessageHeaderAccessor.getDestination(headers);
	if (sourceDestination == null || !checkDestination(sourceDestination, this.prefix)) {
		return null;
	}
	SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
	if (messageType != null) {
		switch (messageType) {
			case SUBSCRIBE:
			case UNSUBSCRIBE:
				return parseSubscriptionMessage(message, sourceDestination);
			case MESSAGE:
				return parseMessage(headers, sourceDestination);
		}
	}
	return null;
}
 
@Test
public void readWriteIntervalCalculation() throws Exception {

	this.messageHandler.setHeartbeatValue(new long[] {1, 1});
	this.messageHandler.setTaskScheduler(this.taskScheduler);
	this.messageHandler.start();

	ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
	verify(this.taskScheduler).scheduleWithFixedDelay(taskCaptor.capture(), eq(1L));
	Runnable heartbeatTask = taskCaptor.getValue();
	assertNotNull(heartbeatTask);

	String id = "sess1";
	TestPrincipal user = new TestPrincipal("joe");
	Message<String> connectMessage = createConnectMessage(id, user, new long[] {10000, 10000});
	this.messageHandler.handleMessage(connectMessage);

	Thread.sleep(10);
	heartbeatTask.run();

	verify(this.clientOutboundChannel, times(1)).send(this.messageCaptor.capture());
	List<Message<?>> messages = this.messageCaptor.getAllValues();
	assertEquals(1, messages.size());
	assertEquals(SimpMessageType.CONNECT_ACK,
			messages.get(0).getHeaders().get(SimpMessageHeaderAccessor.MESSAGE_TYPE_HEADER));
}
 
@Override
public void run() {
	long now = System.currentTimeMillis();
	for (SessionInfo info : sessions.values()) {
		if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
			handleDisconnect(info.getSessionId(), info.getUser(), null);
		}
		if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
			SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
			accessor.setSessionId(info.getSessionId());
			Principal user = info.getUser();
			if (user != null) {
				accessor.setUser(user);
			}
			initHeaders(accessor);
			accessor.setLeaveMutable(true);
			MessageHeaders headers = accessor.getMessageHeaders();
			info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
		}
	}
}
 
@Test
public void testAfterMessageHandled() {
  Span span = mock(Span.class);
  Scope scope = mock(Scope.class);
  MessageHandler messageHandler = mock(WebSocketAnnotationMethodMessageHandler.class);
  MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("Hi")
      .setHeader(TracingChannelInterceptor.SIMP_MESSAGE_TYPE, SimpMessageType.MESSAGE)
      .setHeader(TracingChannelInterceptor.SIMP_DESTINATION, TEST_DESTINATION)
      .setHeader(TracingChannelInterceptor.OPENTRACING_SCOPE, scope)
      .setHeader(TracingChannelInterceptor.OPENTRACING_SPAN, span);

  TracingChannelInterceptor interceptor = new TracingChannelInterceptor(mockTracer,
      Tags.SPAN_KIND_CLIENT);
  interceptor.afterMessageHandled(messageBuilder.build(), null, messageHandler, null);

  // Verify span is finished and scope is closed
  verify(span).finish();
  verify(scope).close();
}
 
@Test
public void clientInboundChannelSendMessage() throws Exception {
	ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
	TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
	SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);

	List<ChannelInterceptor> interceptors = channel.getInterceptors();
	assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());

	TestWebSocketSession session = new TestWebSocketSession("s1");
	session.setOpen(true);
	webSocketHandler.afterConnectionEstablished(session);

	TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
	webSocketHandler.handleMessage(session, textMessage);

	Message<?> message = channel.messages.get(0);
	StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	assertNotNull(accessor);
	assertFalse(accessor.isMutable());
	assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
	assertEquals("/foo", accessor.getDestination());
}
 
@Test
public void handleMessageEncodedUserName() {
	String userName = "https://joe.openid.example.org/";

	TestSimpUser simpUser = new TestSimpUser(userName);
	simpUser.addSessions(new TestSimpSession("openid123"));
	given(this.registry.getUser(userName)).willReturn(simpUser);

	String destination = "/user/" + StringUtils.replace(userName, "/", "%2F") + "/queue/foo";

	Message<?> message = createMessage(SimpMessageType.MESSAGE, new TestPrincipal("joe"), null, destination);
	UserDestinationResult actual = this.resolver.resolveDestination(message);

	assertEquals(1, actual.getTargetDestinations().size());
	assertEquals("/queue/foo-useropenid123", actual.getTargetDestinations().iterator().next());
}
 
@Test
public void handleMessageToClientWithSimpDisconnectAck() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
			"\n\u0000", actual.getPayload());
}
 
@Test
public void addOneSessionId() {
	TestPrincipal user = new TestPrincipal("joe");
	Message<byte[]> message = createMessage(SimpMessageType.CONNECT_ACK, "123");
	SessionConnectedEvent event = new SessionConnectedEvent(this, message, user);

	DefaultSimpUserRegistry registry = new DefaultSimpUserRegistry();
	registry.onApplicationEvent(event);

	SimpUser simpUser = registry.getUser("joe");
	assertNotNull(simpUser);

	assertEquals(1, registry.getUserCount());
	assertEquals(1, simpUser.getSessions().size());
	assertNotNull(simpUser.getSession("123"));
}
 
@Override
public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
	MessageHeaders headers = message.getHeaders();

	SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
	if (!SimpMessageType.MESSAGE.equals(type)) {
		throw new IllegalArgumentException("Unexpected message type: " + type);
	}

	String destination = SimpMessageHeaderAccessor.getDestination(headers);
	if (destination == null) {
		logger.error("No destination in " + message);
		return EMPTY_MAP;
	}

	return findSubscriptionsInternal(destination, message);
}
 
@Test // SPR-12444
public void handleMessageToOtherUser() {

	TestSimpUser otherSimpUser = new TestSimpUser("anna");
	otherSimpUser.addSessions(new TestSimpSession("456"));
	when(this.registry.getUser("anna")).thenReturn(otherSimpUser);

	TestPrincipal user = new TestPrincipal("joe");
	TestPrincipal otherUser = new TestPrincipal("anna");
	String sourceDestination = "/user/anna/queue/foo";
	Message<?> message = createMessage(SimpMessageType.MESSAGE, user, "456", sourceDestination);

	UserDestinationResult actual = this.resolver.resolveDestination(message);

	assertEquals(sourceDestination, actual.getSourceDestination());
	assertEquals(1, actual.getTargetDestinations().size());
	assertEquals("/queue/foo-user456", actual.getTargetDestinations().iterator().next());
	assertEquals("/user/queue/foo", actual.getSubscribeDestination());
	assertEquals(otherUser.getName(), actual.getUser());
}
 
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0,1.1");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.1\n" + "heart-beat:0,0\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
@Test
public void clientInboundChannelSendMessage() throws Exception {
	ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
	TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
	SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);

	List<ChannelInterceptor> interceptors = channel.getInterceptors();
	assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());

	TestWebSocketSession session = new TestWebSocketSession("s1");
	session.setOpen(true);
	webSocketHandler.afterConnectionEstablished(session);

	webSocketHandler.handleMessage(session,
			StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build());

	Message<?> message = channel.messages.get(0);
	StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
	assertNotNull(accessor);
	assertFalse(accessor.isMutable());
	assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
	assertEquals("/foo", accessor.getDestination());
}
 
@Test
public void addMultipleSessionIds() {
	DefaultSimpUserRegistry registry = new DefaultSimpUserRegistry();

	TestPrincipal user = new TestPrincipal("joe");
	Message<byte[]> message = createMessage(SimpMessageType.CONNECT_ACK, "123");
	SessionConnectedEvent event = new SessionConnectedEvent(this, message, user);
	registry.onApplicationEvent(event);

	message = createMessage(SimpMessageType.CONNECT_ACK, "456");
	event = new SessionConnectedEvent(this, message, user);
	registry.onApplicationEvent(event);

	message = createMessage(SimpMessageType.CONNECT_ACK, "789");
	event = new SessionConnectedEvent(this, message, user);
	registry.onApplicationEvent(event);

	SimpUser simpUser = registry.getUser("joe");
	assertNotNull(simpUser);

	assertEquals(1, registry.getUserCount());
	assertEquals(3, simpUser.getSessions().size());
	assertNotNull(simpUser.getSession("123"));
	assertNotNull(simpUser.getSession("456"));
	assertNotNull(simpUser.getSession("789"));
}
 
@Test
public void brokerChannelUsedByAnnotatedMethod() {
	ApplicationContext context = loadConfig(SimpleBrokerConfig.class);

	TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
	SimpAnnotationMethodMessageHandler messageHandler =
			context.getBean(SimpAnnotationMethodMessageHandler.class);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
	headers.setSessionId("sess1");
	headers.setSessionAttributes(new ConcurrentHashMap<>());
	headers.setDestination("/foo");
	Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());

	messageHandler.handleMessage(message);

	message = channel.messages.get(0);
	headers = StompHeaderAccessor.wrap(message);

	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("/bar", headers.getDestination());
	assertEquals("bar", new String((byte[]) message.getPayload()));
}
 
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
	TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
	SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
	headers.setSessionId("sess1");
	headers.setSessionAttributes(new ConcurrentHashMap<>());
	headers.setSubscriptionId("subs1");
	headers.setDestination("/foo");
	Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();

	messageHandler.handleMessage(message);

	message = channel.messages.get(0);
	headers = StompHeaderAccessor.wrap(message);

	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("/foo", headers.getDestination());
	assertEquals("bar", new String((byte[]) message.getPayload()));
}
 
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
	accessor.setHeartbeat(10000, 10000);
	accessor.setAcceptVersion("1.0");
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
			"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
 
@Test
public void handleMessageToClientWithSimpDisconnectAck() {

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
	Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());

	SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
	ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
	Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
	this.protocolHandler.handleMessageToClient(this.session, ackMessage);

	assertEquals(1, this.session.getSentMessages().size());
	TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
	assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
			"\n\u0000", actual.getPayload());
}
 
源代码19 项目: airsonic-advanced   文件: PlayQueueService.java
private void postBroadcast(PlayQueueInfo info, Player player, String sessionId) {
    if (info.getStartPlayerAt() != -1) {
        if (player.isWeb() && sessionId != null) {
            // trigger the web player to start playing at this location
            SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
            headerAccessor.setSessionId(sessionId);
            brokerTemplate.convertAndSendToUser(player.getUsername(),
                    "/queue/playqueues/" + player.getId() + "/skip",
                    ImmutableMap.of("index", info.getStartPlayerAt(), "offset", info.getStartPlayerAtPosition()),
                    headerAccessor.getMessageHeaders());
        } else if (!player.isExternalWithPlaylist()) {
            skip(player, info.getStartPlayerAt(), info.getStartPlayerAtPosition());
        }
    }
}
 
protected void sendMessageToSubscribers(String destination, Message<?> message) {
	MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
	if (!subscriptions.isEmpty() && logger.isDebugEnabled()) {
		logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
	}
	long now = System.currentTimeMillis();
	for (String sessionId : subscriptions.keySet()) {
		for (String subscriptionId : subscriptions.get(sessionId)) {
			SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
			initHeaders(headerAccessor);
			headerAccessor.setSessionId(sessionId);
			headerAccessor.setSubscriptionId(subscriptionId);
			headerAccessor.copyHeadersIfAbsent(message.getHeaders());
			Object payload = message.getPayload();
			Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
			try {
				getClientOutboundChannel().send(reply);
			}
			catch (Throwable ex) {
				logger.error("Failed to send " + message, ex);
			}
			finally {
				SessionInfo info = this.sessions.get(sessionId);
				if (info != null) {
					info.setLastWriteTime(now);
				}
			}
		}
	}
}
 
public static MessageExchangeBuilder send(String destination, String payload) {
	SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
	headers.setDestination(destination);
	Message<?> message = MessageBuilder.createMessage(payload.getBytes(StandardCharsets.UTF_8),
			headers.getMessageHeaders());
	return new MessageExchangeBuilder(message);
}
 
源代码22 项目: spring-analysis-note   文件: StompHeaderAccessor.java
public StompCommand updateStompCommandAsClientMessage() {
	SimpMessageType messageType = getMessageType();
	if (messageType != SimpMessageType.MESSAGE) {
		throw new IllegalStateException("Unexpected message type " + messageType);
	}
	StompCommand command = getCommand();
	if (command == null) {
		command = StompCommand.SEND;
		setHeader(COMMAND_HEADER, command);
	}
	else if (!command.equals(StompCommand.SEND)) {
		throw new IllegalStateException("Unexpected STOMP command " + command);
	}
	return command;
}
 
@Test
public void createWithMessageFrameNativeHeaders() {
	MultiValueMap<String, String> extHeaders = new LinkedMultiValueMap<>();
	extHeaders.add(StompHeaderAccessor.DESTINATION_HEADER, "/d");
	extHeaders.add(StompHeaderAccessor.STOMP_SUBSCRIPTION_HEADER, "s1");
	extHeaders.add(StompHeaderAccessor.STOMP_CONTENT_TYPE_HEADER, "application/json");

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE, extHeaders);

	assertEquals(StompCommand.MESSAGE, headers.getCommand());
	assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
	assertEquals("s1", headers.getSubscriptionId());
}
 
@Test
public void handleMessageWithNoUser() {
	String sourceDestination = "/user/" + "123" + "/queue/foo";
	Message<?> message = createMessage(SimpMessageType.MESSAGE, null, "123", sourceDestination);
	UserDestinationResult actual = this.resolver.resolveDestination(message);

	assertEquals(sourceDestination, actual.getSourceDestination());
	assertEquals(1, actual.getTargetDestinations().size());
	assertEquals("/queue/foo-user123", actual.getTargetDestinations().iterator().next());
	assertEquals("/user/queue/foo", actual.getSubscribeDestination());
	assertNull(actual.getUser());
}
 
源代码25 项目: spring-analysis-note   文件: StompEncoder.java
/**
 * Encodes the given payload and headers into a {@code byte[]}.
 * @param headers the headers
 * @param payload the payload
 * @return the encoded message
 */
public byte[] encode(Map<String, Object> headers, byte[] payload) {
	Assert.notNull(headers, "'headers' is required");
	Assert.notNull(payload, "'payload' is required");

	try {
		ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + payload.length);
		DataOutputStream output = new DataOutputStream(baos);

		if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) {
			logger.trace("Encoding heartbeat");
			output.write(StompDecoder.HEARTBEAT_PAYLOAD);
		}

		else {
			StompCommand command = StompHeaderAccessor.getCommand(headers);
			if (command == null) {
				throw new IllegalStateException("Missing STOMP command: " + headers);
			}

			output.write(command.toString().getBytes(StandardCharsets.UTF_8));
			output.write(LF);
			writeHeaders(command, headers, payload, output);
			output.write(LF);
			writeBody(payload, output);
			output.write((byte) 0);
		}

		return baos.toByteArray();
	}
	catch (IOException ex) {
		throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers,  ex);
	}
}
 
源代码26 项目: spring4-understanding   文件: StompCodecTests.java
@Test
public void decodeHeartbeat() {
	String frame = "\n";

	Buffer buffer = Buffer.wrap(frame);

	final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
	new Reactor2StompCodec().decoder(messages::add).apply(buffer);

	assertEquals(1, messages.size());
	assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
}
 
@Test
void preSendConnectAckDoesNotInvokeSessionRepository() {
	setMessageType(SimpMessageType.CONNECT_ACK);

	assertThat(this.interceptor.preSend(createMessage(), this.channel)).isSameAs(this.createMessage);

	verifyZeroInteractions(this.sessionRepository);
}
 
@Override
public final void registerSubscription(Message<?> message) {
	MessageHeaders headers = message.getHeaders();

	SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
	if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
		throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
	}

	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	if (sessionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No sessionId in  " + message);
		}
		return;
	}

	String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
	if (subscriptionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No subscriptionId in " + message);
		}
		return;
	}

	String destination = SimpMessageHeaderAccessor.getDestination(headers);
	if (destination == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No destination in " + message);
		}
		return;
	}

	addSubscriptionInternal(sessionId, subscriptionId, destination, message);
}
 
@Override
public final void unregisterSubscription(Message<?> message) {
	MessageHeaders headers = message.getHeaders();

	SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
	if (!SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
		throw new IllegalArgumentException("Expected UNSUBSCRIBE: " + message);
	}

	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	if (sessionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No sessionId in " + message);
		}
		return;
	}

	String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
	if (subscriptionId == null) {
		if (logger.isErrorEnabled()) {
			logger.error("No subscriptionId " + message);
		}
		return;
	}

	removeSubscriptionInternal(sessionId, subscriptionId, message);
}
 
@Override
public void handleMessage(Message<?> message) throws MessagingException {
	if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(message.getHeaders())) {
		return;
	}
	this.queue.add(message);
}
 
 类方法
 同包方法