下面列出了怎么用org.springframework.messaging.simp.stomp.StompHeaders的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new TextMessage(text));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageSplitAcrossTwoMessage() throws Exception {
WebSocketHandler webSocketHandler = connect();
String part1 = "SEND\na:alpha\n\nMessage";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part1));
verifyNoMoreInteractions(this.stompSession);
String part2 = " payload\0";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part2));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessage() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new TextMessage(text));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageSplitAcrossTwoMessage() throws Exception {
WebSocketHandler webSocketHandler = connect();
String part1 = "SEND\na:alpha\n\nMessage";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part1));
verifyNoMoreInteractions(this.stompSession);
String part2 = " payload\0";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part2));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void handleWebSocketMessageBinary() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(StandardCharsets.UTF_8)));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), StandardCharsets.UTF_8));
}
/**
* 发送消息
*
* @param sessionList
* @return
*/
private static Runnable newSendMessageRunnable(List<StompSession> sessionList) {
return new Runnable() {
@Override
public void run() {
int i = 0;
for (StompSession session : sessionList) {
i++;
StompHeaders headers = new StompHeaders();
headers.setDestination("/app/receiveMessage");
headers.set("my-login-user", "小" + i);
Map<String, Object> payload = new HashMap<>(2);
payload.put("msg", "你好");
session.send(headers, payload);
}
}
};
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (headers.getDestination() != null) {
String destination = headers.getDestination();
if (logger.isDebugEnabled()) {
logger.debug("Received " + payload + ", To " + headers.getDestination());
}
Principal principal = userAgentSession.getPrincipal();
String userDestinationPrefix = messagingTemplate.getUserDestinationPrefix();
if (principal != null && destination.startsWith(userDestinationPrefix)) {
destination = destination.substring(userDestinationPrefix.length());
destination = destination.startsWith("/") ? destination
: "/" + destination;
messagingTemplate.convertAndSendToUser(principal.getName(), destination,
payload, copyHeaders(headers.toSingleValueMap()));
} else {
messagingTemplate.convertAndSend(destination, payload,
copyHeaders(headers.toSingleValueMap()));
}
}
}
@Test
public void shouldCallAuthServiceWhenUserTriesToConnect() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add(AuthChannelInterceptorAdapter.USERNAME_HEADER, "john");
stompHeaders.add(AuthChannelInterceptorAdapter.TOKEN_HEADER, TestConstant.UI_SECRET_TOKEN);
stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new WebSocketHttpHeaders(), stompHeaders, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
verify(authenticatorService, times(1)).getAuthenticatedOrFail("john", TestConstant.UI_SECRET_TOKEN);
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void handleWebSocketMessage() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new TextMessage(text));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), UTF_8));
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void handleWebSocketMessageSplitAcrossTwoMessage() throws Exception {
WebSocketHandler webSocketHandler = connect();
String part1 = "SEND\na:alpha\n\nMessage";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part1));
verifyNoMoreInteractions(this.stompSession);
String part2 = " payload\0";
webSocketHandler.handleMessage(this.webSocketSession, new TextMessage(part2));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), UTF_8));
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void handleWebSocketMessageBinary() throws Exception {
String text = "SEND\na:alpha\n\nMessage payload\0";
connect().handleMessage(this.webSocketSession, new BinaryMessage(text.getBytes(UTF_8)));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(this.stompSession).handleMessage(captor.capture());
Message<byte[]> message = captor.getValue();
assertNotNull(message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaders headers = StompHeaders.readOnlyStompHeaders(accessor.toNativeHeaderMap());
assertEquals(StompCommand.SEND, accessor.getCommand());
assertEquals("alpha", headers.getFirst("a"));
assertEquals("Message payload", new String(message.getPayload(), UTF_8));
}
@Override
protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeaders) {
connectHeaders = super.processConnectHeaders(connectHeaders);
if (connectHeaders.isHeartbeatEnabled()) {
Assert.state(getTaskScheduler() != null, "TaskScheduler must be set if heartbeats are enabled");
}
return connectHeaders;
}
@Test
public void heartbeatDefaultValue() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(mock(WebSocketClient.class));
assertArrayEquals(new long[] {0, 0}, stompClient.getDefaultHeartbeat());
StompHeaders connectHeaders = stompClient.processConnectHeaders(null);
assertArrayEquals(new long[] {0, 0}, connectHeaders.getHeartbeat());
}
@Test
public void heartbeatDefaultValueWithScheduler() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(mock(WebSocketClient.class));
stompClient.setTaskScheduler(mock(TaskScheduler.class));
assertArrayEquals(new long[] {10000, 10000}, stompClient.getDefaultHeartbeat());
StompHeaders connectHeaders = stompClient.processConnectHeaders(null);
assertArrayEquals(new long[] {10000, 10000}, connectHeaders.getHeartbeat());
}
@Override
protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeaders) {
connectHeaders = super.processConnectHeaders(connectHeaders);
if (connectHeaders.isHeartbeatEnabled()) {
Assert.state(getTaskScheduler() != null, "TaskScheduler must be set if heartbeats are enabled");
}
return connectHeaders;
}
@Test
public void heartbeatDefaultValue() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(mock(WebSocketClient.class));
assertArrayEquals(new long[] {0, 0}, stompClient.getDefaultHeartbeat());
StompHeaders connectHeaders = stompClient.processConnectHeaders(null);
assertArrayEquals(new long[] {0, 0}, connectHeaders.getHeartbeat());
}
@Test
public void heartbeatDefaultValueWithScheduler() throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(mock(WebSocketClient.class));
stompClient.setTaskScheduler(mock(TaskScheduler.class));
assertArrayEquals(new long[] {10000, 10000}, stompClient.getDefaultHeartbeat());
StompHeaders connectHeaders = stompClient.processConnectHeaders(null);
assertArrayEquals(new long[] {10000, 10000}, connectHeaders.getHeartbeat());
}
public static void sendEnter(final Object arg) {
final Tracer tracer = GlobalTracer.get();
final StompHeaders headers = (StompHeaders)arg;
final Span span = tracer.buildSpan(headers.getDestination())
.withTag(Tags.COMPONENT, "stomp-session")
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CLIENT)
.start();
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new StompHeadersInjectAdapter(headers));
spanHolder.set(span);
}
@Test
public void testSend(final MockTracer tracer) {
final StompSession stompSession = new DefaultStompSession(new StompSessionHandlerAdapter() {}, new StompHeaders());
try {
stompSession.send("endpoint", "Hello");
}
catch (final Exception ignore) {
}
assertEquals(1, tracer.finishedSpans().size());
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.subscribe("/topic/greetings", this);
session.send("/app/hello", "{\"name\":\"Client\"}".getBytes());
log.info("New session: {}", session.getSessionId());
}
/**
* {@inheritDoc}
*/
@Override
public Receiptable send(String destination, Object payload) {
StompHeaders headers = new StompHeaders();
headers.setDestination(destination);
return send(headers, payload);
}
/**
* {@inheritDoc}
*/
@Override
public Receiptable send(StompHeaders headers, Object payload) {
Set<Entry<String, String>> entries = copy(headers);
log.trace("Execution context copied to stomp headers: {}.", entries);
return delegate.send(headers, payload);
}
/**
* {@inheritDoc}
*/
@Override
public void handleFrame(StompHeaders headers, Object payload) {
ExecutionContext context = current();
List<Entry<String, String>> eligibleHeaders = headers.toSingleValueMap().entrySet().stream()
.filter(x -> filter.accept(x.getKey()))
.collect(Collectors.toList());
eligibleHeaders.forEach(x -> context.put(x.getKey(), x.getValue()));
log.trace("Stomp Headers copied to execution context: {}.", eligibleHeaders);
delegate.handleFrame(headers, payload);
remove();
}
@Test
public void testHandleFrame() throws Exception {
StompHeaders headers = new StompHeaders();
asList("1", "2", "3").forEach(x -> headers.set(x, x));
propagator.handleFrame(headers, payload);
verify(delegate).handleFrame(headers, payload);
}
@Test
public void testSendEmptyContext() {
propagator.send(destination, payload);
ArgumentHolder<StompHeaders> headers = new ArgumentHolder<>();
verify(delegate).send(headers.eq(), eq(payload));
keysToCopy.forEach(x -> assertThat(headers.getArgument().get(x), is(nullValue())));
}
@Test
public void testNotEmptyContext() {
asList("1", "3", "2").forEach(x -> current().put(x, x));
propagator.send(destination, payload);
ArgumentHolder<StompHeaders> headers = new ArgumentHolder<>();
verify(delegate).send(headers.eq(), eq(payload));
keysToCopy.forEach(x -> assertThat(headers.getArgument().get(x), equalTo(asList(x))));
assertThat(headers.getArgument().get("3"), is(nullValue()));
}
private void showHeaders(StompHeaders headers)
{
for (Map.Entry<String,List<String>> e:headers.entrySet()) {
System.err.print(" " + e.getKey() + ": ");
boolean first = true;
for (String v : e.getValue()) {
if ( ! first ) System.err.print(", ");
System.err.print(v);
first = false;
}
System.err.println();
}
}
@Override
public void afterConnected(StompSession session,
StompHeaders connectedHeaders)
{
System.err.println("Connected! Headers:");
showHeaders(connectedHeaders);
subscribeTopic("/topic/messages", session);
sendJsonMessage(session);
}
@Override
public void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable ex) {
if (errorHandler != null) {
errorHandler.handleError(new ProxySessionException(this, session, ex));
}
}