下面列出了怎么用org.springframework.web.socket.messaging.WebSocketStompClient的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void before() throws Exception {
log.info("=============================={}.{}==============================",
this.getClass().getSimpleName(),
this.testName.getMethodName());
String brokerStomp = "ws://localhost:" + this.listenPort + "/weevent-broker/stomp";
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
this.stompClient = new WebSocketStompClient(new StandardWebSocketClient());
// MappingJackson2MessageConverter
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
this.header.setDestination(topic);
this.header.set("eventId", WeEvent.OFFSET_LAST);
this.header.set("groupId", WeEvent.DEFAULT_GROUP_ID);
this.failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
this.stompSession = this.stompClient.connect(brokerStomp, new MyStompSessionHandler(latch, this.failure)).get();
latch.await();
this.stompSession.setAutoReceipt(true);
}
private void testOverSockJS() throws InterruptedException {
// sock js transport
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient sockjsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockjsClient);
// StringMessageConverter
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
stompClient.connect(brokerSockJS, getSockJSSessionHandlerAdapter());
Thread.sleep(100000L);
}
@Bean
public CommandLineRunner commandLineRunner() {
return new CommandLineRunner() {
@Override
public void run(final String ... args) throws Exception {
final String url = "ws://localhost:8080/test-websocket";
final WebSocketStompClient stompClient = new WebSocketStompClient(new SockJsClient(createTransportClient()));
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
final StompSession stompSession = stompClient.connect(url, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.subscribe(SUBSCRIBE_GREETINGS_ENDPOINT, new GreetingStompFrameHandler());
stompSession.send(SEND_HELLO_MESSAGE_ENDPOINT, "Hello");
}
};
}
public static void main(String... argv) {
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
stompClient.setTaskScheduler(new ConcurrentTaskScheduler());
String url = "ws://127.0.0.1:8080/hello";
StompSessionHandler sessionHandler = new MySessionHandler();
stompClient.connect(url, sessionHandler);
new Scanner(System.in).nextLine(); //Don't close immediately.
}
@Test
public void shouldCallAuthServiceWhenUserTriesToConnect() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add(AuthChannelInterceptorAdapter.USERNAME_HEADER, "john");
stompHeaders.add(AuthChannelInterceptorAdapter.TOKEN_HEADER, TestConstant.UI_SECRET_TOKEN);
stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new WebSocketHttpHeaders(), stompHeaders, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
verify(authenticatorService, times(1)).getAuthenticatedOrFail("john", TestConstant.UI_SECRET_TOKEN);
}
@Test
public void shouldMapDestinationToMessageMappingWithDestinationPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.send("/joal/global", null);
verify(messagingCallback, timeout(1500).times(1)).global();
stompSession.send("/joal/announce", null);
verify(messagingCallback, timeout(1500).times(1)).announce();
stompSession.send("/joal/config", null);
verify(messagingCallback, timeout(1500).times(1)).config();
stompSession.send("/joal/torrents", null);
verify(messagingCallback, timeout(1500).times(1)).torrents();
stompSession.send("/joal/speed", null);
verify(messagingCallback, timeout(1500).times(1)).speed();
}
private void testOverWebSocket() throws InterruptedException {
// standard web socket transport
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
// MappingJackson2MessageConverter
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
stompClient.connect(brokerStomp, getWebSocketSessionHandlerAdapter());
Thread.sleep(100000L);
}
@BeforeEach
public void setup() {
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
this.sockJsClient = new SockJsClient(transports);
this.stompClient = new WebSocketStompClient(sockJsClient);
this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
}
public WebSocket(String url) {
this.url = url;
var transports = new ArrayList<Transport>(1);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
WebSocketClient webSocketClient = new SockJsClient(transports);
client = new WebSocketStompClient(webSocketClient);
client.setMessageConverter(new MappingJackson2MessageConverter());
}
public static void main(String args[]) throws Exception
{
WebSocketClient simpleWebSocketClient =
new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(1);
transports.add(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient =
new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
String url = "ws://localhost:9090/chat";
String userId = "spring-" +
ThreadLocalRandom.current().nextInt(1, 99);
StompSessionHandler sessionHandler = new MyStompSessionHandler(userId);
StompSession session = stompClient.connect(url, sessionHandler)
.get();
BufferedReader in =
new BufferedReader(new InputStreamReader(System.in));
for (;;) {
System.out.print(userId + " >> ");
System.out.flush();
String line = in.readLine();
if ( line == null ) break;
if ( line.length() == 0 ) continue;
ClientMessage msg = new ClientMessage(userId, line);
session.send("/app/chat/java", msg);
}
}
@Test
public void testTracedWebsocketSession()
throws URISyntaxException, InterruptedException, ExecutionException, TimeoutException {
WebSocketStompClient stompClient = new WebSocketStompClient(
new SockJsClient(createTransportClient()));
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSession stompSession = stompClient.connect(url, new StompSessionHandlerAdapter() {
}).get(1, TimeUnit.SECONDS);
stompSession.subscribe(SUBSCRIBE_GREETINGS_ENDPOINT, new GreetingStompFrameHandler());
stompSession.send(SEND_HELLO_MESSAGE_ENDPOINT, new HelloMessage("Hi"));
await().until(() -> mockTracer.finishedSpans().size() == 3);
List<MockSpan> mockSpans = mockTracer.finishedSpans();
// test same trace
assertEquals(mockSpans.get(0).context().traceId(), mockSpans.get(1).context().traceId());
assertEquals(mockSpans.get(0).context().traceId(), mockSpans.get(2).context().traceId());
List<MockSpan> sendHelloSpans = mockSpans.stream()
.filter(s -> s.operationName().equals(SEND_HELLO_MESSAGE_ENDPOINT))
.collect(Collectors.toList());
List<MockSpan> subscribeGreetingsEndpointSpans = mockSpans.stream().filter(s ->
s.operationName().equals(SUBSCRIBE_GREETINGS_ENDPOINT))
.collect(Collectors.toList());
List<MockSpan> greetingControllerSpans = mockSpans.stream().filter(s ->
s.operationName().equals(GreetingController.DOING_WORK))
.collect(Collectors.toList());
assertEquals(sendHelloSpans.size(), 1);
assertEquals(subscribeGreetingsEndpointSpans.size(), 1);
assertEquals(greetingControllerSpans.size(), 1);
assertEquals(sendHelloSpans.get(0).context().spanId(), subscribeGreetingsEndpointSpans.get(0).parentId());
assertEquals(sendHelloSpans.get(0).context().spanId(), greetingControllerSpans.get(0).parentId());
}
public ProxyWebSocketHandler(WebSocketHandler delegate,
WebSocketStompClient stompClient,
WebSocketHttpHeadersCallback headersCallback,
SimpMessagingTemplate messagingTemplate,
ProxyTargetResolver proxyTargetResolver,
ZuulWebSocketProperties zuulWebSocketProperties) {
super(delegate);
this.stompClient = stompClient;
this.headersCallback = headersCallback;
this.messagingTemplate = messagingTemplate;
this.proxyTargetResolver = proxyTargetResolver;
this.zuulWebSocketProperties = zuulWebSocketProperties;
}
public ProxyWebSocketConnectionManager(SimpMessagingTemplate messagingTemplate,
WebSocketStompClient stompClient, WebSocketSession userAgentSession,
WebSocketHttpHeadersCallback httpHeadersCallback, String uri) {
super(uri);
this.messagingTemplate = messagingTemplate;
this.stompClient = stompClient;
this.userAgentSession = userAgentSession;
this.httpHeadersCallback = httpHeadersCallback;
}
@Bean
@ConditionalOnMissingBean(WebSocketStompClient.class)
public WebSocketStompClient stompClient(WebSocketClient webSocketClient, MessageConverter messageConverter,
@Qualifier("proxyStompClientTaskScheduler") TaskScheduler taskScheduler) {
int bufferSizeLimit = 1024 * 1024 * 8;
WebSocketStompClient client = new WebSocketStompClient(webSocketClient);
client.setInboundMessageSizeLimit(bufferSizeLimit);
client.setMessageConverter(messageConverter);
client.setTaskScheduler(taskScheduler);
client.setDefaultHeartbeat(new long[]{0, 0});
return client;
}
@Test
public void shouldPermitOnPrefixedUriForWebsocketHandshakeEndpoint() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
assertThat(stompSession.isConnected()).isTrue();
}
@Test
public void shouldBeAbleToConnectToAppPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(1000, TimeUnit.SECONDS);
assertThat(stompSession.isConnected()).isTrue();
}
@Test
public void shouldNotBeAbleToConnectWithoutAppPrefix() {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
assertThatThrownBy(() ->
stompClient.connect("ws://localhost:" + port + "/", new StompSessionHandlerAdapter() {
}).get(1000, TimeUnit.SECONDS)
)
.isInstanceOf(ExecutionException.class)
.hasMessageContaining("The HTTP response from the server [404]");
}
@Test
public void shouldNotMapDestinationToMessageMappingWithoutDestinationPrefix() throws InterruptedException, ExecutionException, TimeoutException {
final WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
final StompSession stompSession = stompClient.connect("ws://localhost:" + port + "/" + TestConstant.UI_PATH_PREFIX, new StompSessionHandlerAdapter() {
}).get(10, TimeUnit.SECONDS);
stompSession.send("/global", null);
Thread.sleep(1500);
verify(messagingCallback, timeout(1500).times(0)).global();
}
public static void main(String[] args) {
WebSocketClient client = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(client);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(URL, sessionHandler);
new Scanner(System.in).nextLine(); // Don't close immediately.
}
@Before
public void setup() {
stompClient = new WebSocketStompClient(
new SockJsClient(asList(new WebSocketTransport(new StandardWebSocketClient()))));
}
/**
* Attempts to make a websocket connection as an authenticated user, and stream values from a view.
*/
@Test
public void test_authenticated_webSocketConnection() throws InterruptedException {
// Create a count down latch to know when we have consumed all of our records.
final CountDownLatch countDownLatch = new CountDownLatch(kafkaRecords.size());
// Create a list we can add our consumed records to
final List<Map> consumedRecords = new ArrayList<>();
// Login to instance.
final UserLoginDetails userLoginDetails = login();
final WebSocketHttpHeaders socketHttpHeaders = new WebSocketHttpHeaders(userLoginDetails.getHttpHeaders());
final long userId = userLoginDetails.getUserId();
// Create websocket client
final SockJsClient sockJsClient = new SockJsClient(createTransportClient());
final WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
// Connect to websocket
stompClient.connect(WEBSOCKET_URL, socketHttpHeaders, new StompSessionHandlerAdapter() {
/**
* After we connect, subscribe to our view.
*/
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
session.setAutoReceipt(false);
subscribeToResults(session, view.getId(), userId, countDownLatch, consumedRecords);
try {
requestNewStream(session, view.getId());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, port);
// Start the client.
stompClient.start();
// Define a max time of 15 seconds
Duration testTimeout = Duration.ofSeconds(15);
while (countDownLatch.getCount() > 0) {
// Sleep for a period and recheck.
Thread.sleep(1000L);
testTimeout = testTimeout.minusSeconds(1);
if (testTimeout.isNegative()) {
fail("Test timed out!");
}
}
// Success!
assertEquals("Found all messages!", consumedRecords.size(), kafkaRecords.size());
}