下面列出了怎么用org.springframework.messaging.simp.stomp.StompSession的API类实例代码及写法,或者点击链接到github查看源代码。
@Bean
public CommandLineRunner commandLineRunner() {
return new CommandLineRunner() {
@Override
public void run(final String ... args) throws Exception {
final String url = "ws://localhost:8080/test-websocket";
final WebSocketStompClient stompClient = new WebSocketStompClient(new SockJsClient(createTransportClient()));
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
final StompSession stompSession = stompClient.connect(url, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.subscribe(SUBSCRIBE_GREETINGS_ENDPOINT, new GreetingStompFrameHandler());
stompSession.send(SEND_HELLO_MESSAGE_ENDPOINT, "Hello");
}
};
}
/**
* 发送消息
*
* @param sessionList
* @return
*/
private static Runnable newSendMessageRunnable(List<StompSession> sessionList) {
return new Runnable() {
@Override
public void run() {
int i = 0;
for (StompSession session : sessionList) {
i++;
StompHeaders headers = new StompHeaders();
headers.setDestination("/app/receiveMessage");
headers.set("my-login-user", "小" + i);
Map<String, Object> payload = new HashMap<>(2);
payload.put("msg", "你好");
session.send(headers, payload);
}
}
};
}
@Test
public void shouldMapDestinationToMessageMappingWithDestinationPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.send("/joal/global", null);
verify(messagingCallback, timeout(1500).times(1)).global();
stompSession.send("/joal/announce", null);
verify(messagingCallback, timeout(1500).times(1)).announce();
stompSession.send("/joal/config", null);
verify(messagingCallback, timeout(1500).times(1)).config();
stompSession.send("/joal/torrents", null);
verify(messagingCallback, timeout(1500).times(1)).torrents();
stompSession.send("/joal/speed", null);
verify(messagingCallback, timeout(1500).times(1)).speed();
}
@Test
public void shouldCreateASubscription() throws Exception {
StompSession session = stompClient.connect(WEBSOCKET_URI, new StompSessionHandlerAdapter() {
}).get(5, SECONDS);
Subscription subscription = session.subscribe(WEBSOCKET_TOPIC, new DefaultStompFrameHandler());
Assert.assertNotNull(subscription.getSubscriptionId());
}
@Test
public void testSend(final MockTracer tracer) {
final StompSession stompSession = new DefaultStompSession(new StompSessionHandlerAdapter() {}, new StompHeaders());
try {
stompSession.send("endpoint", "Hello");
}
catch (final Exception ignore) {
}
assertEquals(1, tracer.finishedSpans().size());
}
public static void main(String[] args) {
ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(3);
//发起连接的次数
AtomicInteger connectCount = new AtomicInteger();
//连接成功数
AtomicInteger successCount = new AtomicInteger();
//连接失败数
AtomicInteger errorCount = new AtomicInteger();
//链接的列表
List<StompSession> sessionList = new CopyOnWriteArrayList<>();
//订阅的列表
List<StompSession.Subscription> subscriptionList = new CopyOnWriteArrayList<>();
//连接并订阅
String url = "ws://localhost:8080/my-websocket?access_token=b90b0e77-63cf-4b05-8d8b-43ebefc71a6a";
Runnable connectRunnable = newConnectAndSubscribeRunnable(url, connectCount, successCount, errorCount, sessionList, subscriptionList);
scheduledService.scheduleAtFixedRate(connectRunnable, 0, 1000, TimeUnit.MILLISECONDS);//1秒间隔 一次新连接
//发送消息
Runnable sendMessageRunnable = newSendMessageRunnable(sessionList);
scheduledService.scheduleAtFixedRate(sendMessageRunnable, 0, 1000, TimeUnit.MILLISECONDS);//1秒间隔 所有会话发送消息
scheduledService.scheduleAtFixedRate(() -> {
//每次5 秒打印一次详情
logger.info(" 连接数:" + connectCount + " 成功数:" + successCount + " 失败数:" + errorCount);
}, 5, 5, TimeUnit.SECONDS);
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.subscribe("/topic/greetings", this);
session.send("/app/hello", "{\"name\":\"Client\"}".getBytes());
log.info("New session: {}", session.getSessionId());
}
/**
* {@inheritDoc}
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof StompSession && !(bean instanceof PreservesHeadersStompSessionAdapter)) {
if (propagationProperties.getStomp().accept(beanName)) {
log.info("Context propagation enabled for stomp session [{}] on keys={}.", beanName, propagationProperties.getKeys());
return new PreservesHeadersStompSessionAdapter((StompSession) bean,
propagationProperties.buildEntriesFilter(),
propagationProperties.buildExtraStaticEntries(eurekaInstanceProperties));
} else {
log.debug("Context propagation disabled for stomp session [{}]", beanName);
}
}
return bean;
}
@Test
public void should_skip_stomp_session() {
PropagationProperties propagationProperties = new PropagationProperties();
propagationProperties.getStomp().getExcludes().add(compile(beanName));
processor.setPropagationProperties(propagationProperties);
processor.setEurekaInstanceProperties(new EurekaInstanceProperties());
assertThat(processor.postProcessAfterInitialization(mock(StompSession.class), beanName).getClass(), not(equalTo(PreservesHeadersStompSessionAdapter.class)));
}
@Override
public void afterConnected(StompSession session,
StompHeaders connectedHeaders)
{
System.err.println("Connected! Headers:");
showHeaders(connectedHeaders);
subscribeTopic("/topic/messages", session);
sendJsonMessage(session);
}
public static void main(String args[]) throws Exception
{
WebSocketClient simpleWebSocketClient =
new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(1);
transports.add(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient =
new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
String url = "ws://localhost:9090/chat";
String userId = "spring-" +
ThreadLocalRandom.current().nextInt(1, 99);
StompSessionHandler sessionHandler = new MyStompSessionHandler(userId);
StompSession session = stompClient.connect(url, sessionHandler)
.get();
BufferedReader in =
new BufferedReader(new InputStreamReader(System.in));
for (;;) {
System.out.print(userId + " >> ");
System.out.flush();
String line = in.readLine();
if ( line == null ) break;
if ( line.length() == 0 ) continue;
ClientMessage msg = new ClientMessage(userId, line);
session.send("/app/chat/java", msg);
}
}
private void requestNewStream(final StompSession stompSession, long viewId) throws InterruptedException {
final ConsumeRequest consumeRequest = new ConsumeRequest();
consumeRequest.setAction("head");
consumeRequest.setPartitions("0");
stompSession.send("/websocket/consume/" + viewId, consumeRequest);
}
@Test
public void testTracedWebsocketSession()
throws URISyntaxException, InterruptedException, ExecutionException, TimeoutException {
WebSocketStompClient stompClient = new WebSocketStompClient(
new SockJsClient(createTransportClient()));
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSession stompSession = stompClient.connect(url, new StompSessionHandlerAdapter() {
}).get(1, TimeUnit.SECONDS);
stompSession.subscribe(SUBSCRIBE_GREETINGS_ENDPOINT, new GreetingStompFrameHandler());
stompSession.send(SEND_HELLO_MESSAGE_ENDPOINT, new HelloMessage("Hi"));
await().until(() -> mockTracer.finishedSpans().size() == 3);
List<MockSpan> mockSpans = mockTracer.finishedSpans();
// test same trace
assertEquals(mockSpans.get(0).context().traceId(), mockSpans.get(1).context().traceId());
assertEquals(mockSpans.get(0).context().traceId(), mockSpans.get(2).context().traceId());
List<MockSpan> sendHelloSpans = mockSpans.stream()
.filter(s -> s.operationName().equals(SEND_HELLO_MESSAGE_ENDPOINT))
.collect(Collectors.toList());
List<MockSpan> subscribeGreetingsEndpointSpans = mockSpans.stream().filter(s ->
s.operationName().equals(SUBSCRIBE_GREETINGS_ENDPOINT))
.collect(Collectors.toList());
List<MockSpan> greetingControllerSpans = mockSpans.stream().filter(s ->
s.operationName().equals(GreetingController.DOING_WORK))
.collect(Collectors.toList());
assertEquals(sendHelloSpans.size(), 1);
assertEquals(subscribeGreetingsEndpointSpans.size(), 1);
assertEquals(greetingControllerSpans.size(), 1);
assertEquals(sendHelloSpans.get(0).context().spanId(), subscribeGreetingsEndpointSpans.get(0).parentId());
assertEquals(sendHelloSpans.get(0).context().spanId(), greetingControllerSpans.get(0).parentId());
}
@Override
public void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable ex) {
if (errorHandler != null) {
errorHandler.handleError(new ProxySessionException(this, session, ex));
}
}
public void unsubscribe(String destination) {
StompSession.Subscription subscription = subscriptions.remove(destination);
if (subscription != null) {
connectIfNecessary();
subscription.unsubscribe();
}
}
@Test
public void shouldPermitOnPrefixedUriForWebsocketHandshakeEndpoint() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
assertThat(stompSession.isConnected()).isTrue();
}
@Test
public void shouldBeAbleToConnectToAppPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(1000, TimeUnit.SECONDS);
assertThat(stompSession.isConnected()).isTrue();
}
@Test
public void shouldNotMapDestinationToMessageMappingWithoutDestinationPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.send("/global", null);
Thread.sleep(1500);
verify(messagingCallback, timeout(1500).times(0)).global();
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
logger.info("New session established : " + session.getSessionId());
session.subscribe("/topic/messages", this);
logger.info("Subscribed to /topic/messages");
session.send("/app/chat", getSampleMessage());
logger.info("Message sent to websocket server");
}
@Test
public void givenValidSession_whenConnected_SendsMessage() {
StompSession mockSession = Mockito.mock(StompSession.class);
StompHeaders mockHeader = Mockito.mock(StompHeaders.class);
MyStompSessionHandler sessionHandler = new MyStompSessionHandler();
sessionHandler.afterConnected(mockSession, mockHeader);
Mockito.verify(mockSession).subscribe("/topic/messages", sessionHandler);
Mockito.verify(mockSession).send(Mockito.anyString(), Mockito.anyObject());
}
@Override
public void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable ex) {
logger.error(command + " " + headers, ex);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
logger.error(exception);
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
log.info("connection open, {}", session.getSessionId());
this.latch.countDown();
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
log.info("connection exception, {} {}", session.getSessionId(), command);
log.error("exception", exception);
}
@Override
public void handleTransportError(StompSession session, Throwable ex) {
log.info("connection error, {}", session.getSessionId());
log.error("exception", ex);
}
@Override
public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex) {
this.failure.set(ex);
}
@Override
public void handleTransportError(StompSession session, Throwable ex) {
this.failure.set(ex);
}
@Override
public void handleException(StompSession session, StompCommand command,
StompHeaders headers, byte[] payload, Throwable ex) {
logger.error(command + " " + headers, ex);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
logger.error(exception);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
exception.printStackTrace();
}