下面列出了怎么用org.springframework.messaging.simp.SimpMessageType的API类实例代码及写法,或者点击链接到github查看源代码。
public Message<?> preHandle(Message<?> message) throws MessagingException {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
if (!getBroadcastDestination().equals(destination)) {
return message;
}
SimpMessageHeaderAccessor accessor = getAccessor(message, SimpMessageHeaderAccessor.class);
if (accessor.getSessionId() == null) {
// Our own broadcast
return null;
}
destination = accessor.getFirstNativeHeader(ORIGINAL_DESTINATION);
if (logger.isTraceEnabled()) {
logger.trace("Checking unresolved user destination: " + destination);
}
SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
for (String name : accessor.toNativeHeaderMap().keySet()) {
if (NO_COPY_LIST.contains(name)) {
continue;
}
newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
}
newAccessor.setDestination(destination);
newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
}
@Nullable
private ParseResult parse(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String sourceDestination = SimpMessageHeaderAccessor.getDestination(headers);
if (sourceDestination == null || !checkDestination(sourceDestination, this.prefix)) {
return null;
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
if (messageType != null) {
switch (messageType) {
case SUBSCRIBE:
case UNSUBSCRIBE:
return parseSubscriptionMessage(message, sourceDestination);
case MESSAGE:
return parseMessage(headers, sourceDestination);
}
}
return null;
}
@Test
public void readWriteIntervalCalculation() throws Exception {
this.messageHandler.setHeartbeatValue(new long[] {1, 1});
this.messageHandler.setTaskScheduler(this.taskScheduler);
this.messageHandler.start();
ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(this.taskScheduler).scheduleWithFixedDelay(taskCaptor.capture(), eq(1L));
Runnable heartbeatTask = taskCaptor.getValue();
assertNotNull(heartbeatTask);
String id = "sess1";
TestPrincipal user = new TestPrincipal("joe");
Message<String> connectMessage = createConnectMessage(id, user, new long[] {10000, 10000});
this.messageHandler.handleMessage(connectMessage);
Thread.sleep(10);
heartbeatTask.run();
verify(this.clientOutboundChannel, times(1)).send(this.messageCaptor.capture());
List<Message<?>> messages = this.messageCaptor.getAllValues();
assertEquals(1, messages.size());
assertEquals(SimpMessageType.CONNECT_ACK,
messages.get(0).getHeaders().get(SimpMessageHeaderAccessor.MESSAGE_TYPE_HEADER));
}
@Override
public void run() {
long now = System.currentTimeMillis();
for (SessionInfo info : sessions.values()) {
if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
handleDisconnect(info.getSessionId(), info.getUser(), null);
}
if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
accessor.setSessionId(info.getSessionId());
Principal user = info.getUser();
if (user != null) {
accessor.setUser(user);
}
initHeaders(accessor);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
}
}
}
@Test
public void testAfterMessageHandled() {
Span span = mock(Span.class);
Scope scope = mock(Scope.class);
MessageHandler messageHandler = mock(WebSocketAnnotationMethodMessageHandler.class);
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("Hi")
.setHeader(TracingChannelInterceptor.SIMP_MESSAGE_TYPE, SimpMessageType.MESSAGE)
.setHeader(TracingChannelInterceptor.SIMP_DESTINATION, TEST_DESTINATION)
.setHeader(TracingChannelInterceptor.OPENTRACING_SCOPE, scope)
.setHeader(TracingChannelInterceptor.OPENTRACING_SPAN, span);
TracingChannelInterceptor interceptor = new TracingChannelInterceptor(mockTracer,
Tags.SPAN_KIND_CLIENT);
interceptor.afterMessageHandled(messageBuilder.build(), null, messageHandler, null);
// Verify span is finished and scope is closed
verify(span).finish();
verify(scope).close();
}
@Test
public void clientInboundChannelSendMessage() throws Exception {
ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);
List<ChannelInterceptor> interceptors = channel.getInterceptors();
assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());
TestWebSocketSession session = new TestWebSocketSession("s1");
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
webSocketHandler.handleMessage(session, textMessage);
Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
assertFalse(accessor.isMutable());
assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
assertEquals("/foo", accessor.getDestination());
}
@Test
public void handleMessageEncodedUserName() {
String userName = "https://joe.openid.example.org/";
TestSimpUser simpUser = new TestSimpUser(userName);
simpUser.addSessions(new TestSimpSession("openid123"));
given(this.registry.getUser(userName)).willReturn(simpUser);
String destination = "/user/" + StringUtils.replace(userName, "/", "%2F") + "/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, new TestPrincipal("joe"), null, destination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-useropenid123", actual.getTargetDestinations().iterator().next());
}
@Test
public void handleMessageToClientWithSimpDisconnectAck() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
"\n\u0000", actual.getPayload());
}
@Test
public void addOneSessionId() {
TestPrincipal user = new TestPrincipal("joe");
Message<byte[]> message = createMessage(SimpMessageType.CONNECT_ACK, "123");
SessionConnectedEvent event = new SessionConnectedEvent(this, message, user);
DefaultSimpUserRegistry registry = new DefaultSimpUserRegistry();
registry.onApplicationEvent(event);
SimpUser simpUser = registry.getUser("joe");
assertNotNull(simpUser);
assertEquals(1, registry.getUserCount());
assertEquals(1, simpUser.getSessions().size());
assertNotNull(simpUser.getSession("123"));
}
@Override
public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.MESSAGE.equals(type)) {
throw new IllegalArgumentException("Unexpected message type: " + type);
}
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
logger.error("No destination in " + message);
return EMPTY_MAP;
}
return findSubscriptionsInternal(destination, message);
}
@Test // SPR-12444
public void handleMessageToOtherUser() {
TestSimpUser otherSimpUser = new TestSimpUser("anna");
otherSimpUser.addSessions(new TestSimpSession("456"));
when(this.registry.getUser("anna")).thenReturn(otherSimpUser);
TestPrincipal user = new TestPrincipal("joe");
TestPrincipal otherUser = new TestPrincipal("anna");
String sourceDestination = "/user/anna/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, user, "456", sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user456", actual.getTargetDestinations().iterator().next());
assertEquals("/user/queue/foo", actual.getSubscribeDestination());
assertEquals(otherUser.getName(), actual.getUser());
}
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0,1.1");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.1\n" + "heart-beat:0,0\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
@Test
public void clientInboundChannelSendMessage() throws Exception {
ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);
List<ChannelInterceptor> interceptors = channel.getInterceptors();
assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());
TestWebSocketSession session = new TestWebSocketSession("s1");
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);
webSocketHandler.handleMessage(session,
StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build());
Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
assertFalse(accessor.isMutable());
assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
assertEquals("/foo", accessor.getDestination());
}
@Test
public void addMultipleSessionIds() {
DefaultSimpUserRegistry registry = new DefaultSimpUserRegistry();
TestPrincipal user = new TestPrincipal("joe");
Message<byte[]> message = createMessage(SimpMessageType.CONNECT_ACK, "123");
SessionConnectedEvent event = new SessionConnectedEvent(this, message, user);
registry.onApplicationEvent(event);
message = createMessage(SimpMessageType.CONNECT_ACK, "456");
event = new SessionConnectedEvent(this, message, user);
registry.onApplicationEvent(event);
message = createMessage(SimpMessageType.CONNECT_ACK, "789");
event = new SessionConnectedEvent(this, message, user);
registry.onApplicationEvent(event);
SimpUser simpUser = registry.getUser("joe");
assertNotNull(simpUser);
assertEquals(1, registry.getUserCount());
assertEquals(3, simpUser.getSessions().size());
assertNotNull(simpUser.getSession("123"));
assertNotNull(simpUser.getSession("456"));
assertNotNull(simpUser.getSession("789"));
}
@Test
public void brokerChannelUsedByAnnotatedMethod() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
context.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setDestination("/foo");
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/bar", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
@Test
public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setHeartbeat(10000, 10000);
accessor.setAcceptVersion("1.0");
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "version:1.0\n" + "heart-beat:0,0\n" +
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
}
@Test
public void handleMessageToClientWithSimpDisconnectAck() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
assertEquals(1, this.session.getSentMessages().size());
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
"\n\u0000", actual.getPayload());
}
private void postBroadcast(PlayQueueInfo info, Player player, String sessionId) {
if (info.getStartPlayerAt() != -1) {
if (player.isWeb() && sessionId != null) {
// trigger the web player to start playing at this location
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
brokerTemplate.convertAndSendToUser(player.getUsername(),
"/queue/playqueues/" + player.getId() + "/skip",
ImmutableMap.of("index", info.getStartPlayerAt(), "offset", info.getStartPlayerAtPosition()),
headerAccessor.getMessageHeaders());
} else if (!player.isExternalWithPlaylist()) {
skip(player, info.getStartPlayerAt(), info.getStartPlayerAtPosition());
}
}
}
protected void sendMessageToSubscribers(String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
if (!subscriptions.isEmpty() && logger.isDebugEnabled()) {
logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
}
long now = System.currentTimeMillis();
for (String sessionId : subscriptions.keySet()) {
for (String subscriptionId : subscriptions.get(sessionId)) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
initHeaders(headerAccessor);
headerAccessor.setSessionId(sessionId);
headerAccessor.setSubscriptionId(subscriptionId);
headerAccessor.copyHeadersIfAbsent(message.getHeaders());
Object payload = message.getPayload();
Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
try {
getClientOutboundChannel().send(reply);
}
catch (Throwable ex) {
logger.error("Failed to send " + message, ex);
}
finally {
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
info.setLastWriteTime(now);
}
}
}
}
}
public static MessageExchangeBuilder send(String destination, String payload) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headers.setDestination(destination);
Message<?> message = MessageBuilder.createMessage(payload.getBytes(StandardCharsets.UTF_8),
headers.getMessageHeaders());
return new MessageExchangeBuilder(message);
}
public StompCommand updateStompCommandAsClientMessage() {
SimpMessageType messageType = getMessageType();
if (messageType != SimpMessageType.MESSAGE) {
throw new IllegalStateException("Unexpected message type " + messageType);
}
StompCommand command = getCommand();
if (command == null) {
command = StompCommand.SEND;
setHeader(COMMAND_HEADER, command);
}
else if (!command.equals(StompCommand.SEND)) {
throw new IllegalStateException("Unexpected STOMP command " + command);
}
return command;
}
@Test
public void createWithMessageFrameNativeHeaders() {
MultiValueMap<String, String> extHeaders = new LinkedMultiValueMap<>();
extHeaders.add(StompHeaderAccessor.DESTINATION_HEADER, "/d");
extHeaders.add(StompHeaderAccessor.STOMP_SUBSCRIPTION_HEADER, "s1");
extHeaders.add(StompHeaderAccessor.STOMP_CONTENT_TYPE_HEADER, "application/json");
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE, extHeaders);
assertEquals(StompCommand.MESSAGE, headers.getCommand());
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("s1", headers.getSubscriptionId());
}
@Test
public void handleMessageWithNoUser() {
String sourceDestination = "/user/" + "123" + "/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, null, "123", sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user123", actual.getTargetDestinations().iterator().next());
assertEquals("/user/queue/foo", actual.getSubscribeDestination());
assertNull(actual.getUser());
}
/**
* Encodes the given payload and headers into a {@code byte[]}.
* @param headers the headers
* @param payload the payload
* @return the encoded message
*/
public byte[] encode(Map<String, Object> headers, byte[] payload) {
Assert.notNull(headers, "'headers' is required");
Assert.notNull(payload, "'payload' is required");
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + payload.length);
DataOutputStream output = new DataOutputStream(baos);
if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) {
logger.trace("Encoding heartbeat");
output.write(StompDecoder.HEARTBEAT_PAYLOAD);
}
else {
StompCommand command = StompHeaderAccessor.getCommand(headers);
if (command == null) {
throw new IllegalStateException("Missing STOMP command: " + headers);
}
output.write(command.toString().getBytes(StandardCharsets.UTF_8));
output.write(LF);
writeHeaders(command, headers, payload, output);
output.write(LF);
writeBody(payload, output);
output.write((byte) 0);
}
return baos.toByteArray();
}
catch (IOException ex) {
throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers, ex);
}
}
@Test
public void decodeHeartbeat() {
String frame = "\n";
Buffer buffer = Buffer.wrap(frame);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new Reactor2StompCodec().decoder(messages::add).apply(buffer);
assertEquals(1, messages.size());
assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
}
@Test
void preSendConnectAckDoesNotInvokeSessionRepository() {
setMessageType(SimpMessageType.CONNECT_ACK);
assertThat(this.interceptor.preSend(createMessage(), this.channel)).isSameAs(this.createMessage);
verifyZeroInteractions(this.sessionRepository);
}
@Override
public final void registerSubscription(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
}
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
if (logger.isErrorEnabled()) {
logger.error("No sessionId in " + message);
}
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
if (logger.isErrorEnabled()) {
logger.error("No subscriptionId in " + message);
}
return;
}
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
if (logger.isErrorEnabled()) {
logger.error("No destination in " + message);
}
return;
}
addSubscriptionInternal(sessionId, subscriptionId, destination, message);
}
@Override
public final void unregisterSubscription(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
throw new IllegalArgumentException("Expected UNSUBSCRIBE: " + message);
}
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
if (logger.isErrorEnabled()) {
logger.error("No sessionId in " + message);
}
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
if (logger.isErrorEnabled()) {
logger.error("No subscriptionId " + message);
}
return;
}
removeSubscriptionInternal(sessionId, subscriptionId, message);
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(message.getHeaders())) {
return;
}
this.queue.add(message);
}