下面列出了怎么用org.springframework.messaging.simp.stomp.StompSession.Subscription的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void handleMessageFrame() {
this.session.afterConnected(this.connection);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.TEXT_PLAIN);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
String payload = "sample payload";
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
given(frameHandler.getPayloadType(stompHeaders)).willReturn(String.class);
this.session.handleMessage(MessageBuilder.createMessage(payload.getBytes(StandardCharsets.UTF_8),
accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verify(frameHandler).handleFrame(stompHeaders, payload);
verifyNoMoreInteractions(frameHandler);
}
@Test
public void subscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void subscribeWithHeaders() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String subscriptionId = "123";
String destination = "/topic/foo";
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setId(subscriptionId);
stompHeaders.setDestination(destination);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(stompHeaders, frameHandler);
assertEquals(subscriptionId, subscription.getSubscriptionId());
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscriptionId, stompHeaders.getId());
}
@Test
public void unsubscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
subscription.unsubscribe();
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void receiptReceived() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
subscription.addReceiptTask(() -> received.set(true));
assertNull(received.get());
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void receiptReceivedBeforeTaskAdded() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
subscription.addReceiptTask(() -> received.set(true));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void handleMessageFrame() {
this.session.afterConnected(this.connection);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.TEXT_PLAIN);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
String payload = "sample payload";
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
when(frameHandler.getPayloadType(stompHeaders)).thenReturn(String.class);
this.session.handleMessage(MessageBuilder.createMessage(payload.getBytes(StandardCharsets.UTF_8),
accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verify(frameHandler).handleFrame(stompHeaders, payload);
verifyNoMoreInteractions(frameHandler);
}
@Test
public void subscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void subscribeWithHeaders() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String subscriptionId = "123";
String destination = "/topic/foo";
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setId(subscriptionId);
stompHeaders.setDestination(destination);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(stompHeaders, frameHandler);
assertEquals(subscriptionId, subscription.getSubscriptionId());
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscriptionId, stompHeaders.getId());
}
@Test
public void unsubscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
subscription.unsubscribe();
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void receiptReceived() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
subscription.addReceiptTask(() -> received.set(true));
assertNull(received.get());
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void receiptReceivedBeforeTaskAdded() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
subscription.addReceiptTask(() -> received.set(true));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void handleMessageFrame() throws Exception {
this.session.afterConnected(this.connection);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.TEXT_PLAIN);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
String payload = "sample payload";
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
when(frameHandler.getPayloadType(stompHeaders)).thenReturn(String.class);
this.session.handleMessage(MessageBuilder.createMessage(payload.getBytes(UTF_8), accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verify(frameHandler).handleFrame(stompHeaders, payload);
verifyNoMoreInteractions(frameHandler);
}
@Test
public void subscribe() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void subscribeWithHeaders() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String subscriptionId = "123";
String destination = "/topic/foo";
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.setId(subscriptionId);
stompHeaders.setDestination(destination);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(stompHeaders, frameHandler);
assertEquals(subscriptionId, subscription.getSubscriptionId());
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SUBSCRIBE, accessor.getCommand());
stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(subscriptionId, stompHeaders.getId());
}
@Test
public void unsubscribe() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(destination, frameHandler);
subscription.unsubscribe();
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
}
@Test
public void receiptReceived() throws Exception {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
subscription.addReceiptTask(() -> received.set(true));
assertNull(received.get());
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void receiptReceivedBeforeTaskAdded() throws Exception {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
AtomicReference<Boolean> received = new AtomicReference<>();
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/foo");
headers.setReceipt("my-receipt");
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
accessor.setReceiptId("my-receipt");
accessor.setLeaveMutable(true);
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
subscription.addReceiptTask(() -> received.set(true));
assertNotNull(received.get());
assertTrue(received.get());
}
@Test
public void handleMessageFrameWithConversionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
byte[] payload = "{'foo':'bar'}".getBytes(StandardCharsets.UTF_8);
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
given(frameHandler.getPayloadType(stompHeaders)).willReturn(Map.class);
this.session.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verifyNoMoreInteractions(frameHandler);
verify(this.sessionHandler).handleException(same(this.session), same(StompCommand.MESSAGE),
eq(stompHeaders), same(payload), any(MessageConversionException.class));
verifyNoMoreInteractions(this.sessionHandler);
}
@Test // SPR-15131
public void unsubscribeWithCustomHeader() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String headerName = "durable-subscription-name";
String headerValue = "123";
StompHeaders subscribeHeaders = new StompHeaders();
subscribeHeaders.setDestination("/topic/foo");
subscribeHeaders.set(headerName, headerValue);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(subscribeHeaders, frameHandler);
StompHeaders unsubscribeHeaders = new StompHeaders();
unsubscribeHeaders.set(headerName, subscription.getSubscriptionHeaders().getFirst(headerName));
subscription.unsubscribe(unsubscribeHeaders);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
assertEquals(headerValue, stompHeaders.getFirst(headerName));
}
@Test
public void shouldCreateASubscription() throws Exception {
StompSession session = stompClient.connect(WEBSOCKET_URI, new StompSessionHandlerAdapter() {
}).get(5, SECONDS);
Subscription subscription = session.subscribe(WEBSOCKET_TOPIC, new DefaultStompFrameHandler());
Assert.assertNotNull(subscription.getSubscriptionId());
}
@Test
public void handleMessageFrameWithConversionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
byte[] payload = "{'foo':'bar'}".getBytes(StandardCharsets.UTF_8);
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
when(frameHandler.getPayloadType(stompHeaders)).thenReturn(Map.class);
this.session.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verifyNoMoreInteractions(frameHandler);
verify(this.sessionHandler).handleException(same(this.session), same(StompCommand.MESSAGE),
eq(stompHeaders), same(payload), any(MessageConversionException.class));
verifyNoMoreInteractions(this.sessionHandler);
}
@Test // SPR-15131
public void unsubscribeWithCustomHeader() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String headerName = "durable-subscription-name";
String headerValue = "123";
StompHeaders subscribeHeaders = new StompHeaders();
subscribeHeaders.setDestination("/topic/foo");
subscribeHeaders.set(headerName, headerValue);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(subscribeHeaders, frameHandler);
StompHeaders unsubscribeHeaders = new StompHeaders();
unsubscribeHeaders.set(headerName, subscription.getSubscriptionHeaders().getFirst(headerName));
subscription.unsubscribe(unsubscribeHeaders);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
assertEquals(headerValue, stompHeaders.getFirst(headerName));
}
@Test
public void handleMessageFrameWithConversionException() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
String destination = "/topic/foo";
Subscription subscription = this.session.subscribe(destination, frameHandler);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setDestination(destination);
accessor.setSubscriptionId(subscription.getSubscriptionId());
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.setMessageId("1");
accessor.setLeaveMutable(true);
byte[] payload = "{'foo':'bar'}".getBytes(UTF_8);
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
when(frameHandler.getPayloadType(stompHeaders)).thenReturn(Map.class);
this.session.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
verify(frameHandler).getPayloadType(stompHeaders);
verifyNoMoreInteractions(frameHandler);
verify(this.sessionHandler).handleException(same(this.session), same(StompCommand.MESSAGE),
eq(stompHeaders), same(payload), any(MessageConversionException.class));
verifyNoMoreInteractions(this.sessionHandler);
}