下面列出了怎么用org.springframework.web.socket.WebSocketSession的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void gameStartedTest() throws Exception {
GameHandler handler = new GameHandler();
handler.setPieceService(pieceServiceMock);
initializeGame(handler);
Game game = handler.getGames().values().iterator().next();
GameMessage startMsg = new GameMessage();
startMsg.setAction(GameAction.START);
startMsg.setGameId(game.getId());
WebSocketSession session = getSession("1");
handler.handleMessage(session, getTextMessage(startMsg));
GameEvent responseNewPiece = mapper.readValue(messages.get("1").removeLast().getPayload().toString(), GameEvent.class);
GameEvent responseGameStarted = mapper.readValue(messages.get("1").removeLast().getPayload().toString(), GameEvent.class);
Assert.assertEquals(GameEventType.GAME_STARTED, responseGameStarted.getType());
Assert.assertEquals(GameEventType.NEW_PIECE, responseNewPiece.getType());
Assert.assertEquals(piece.getId(), responseNewPiece.getPieceId());
}
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
updateLastWriteTime();
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
session.sendMessage(this.codec.encode(message, session.getClass()));
future.set(null);
}
catch (Throwable ex) {
future.setException(ex);
}
finally {
updateLastWriteTime();
}
return future;
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.debug("afterConnectionClosed(session={},status={})", session, status);
try {
session.close(status);
WebSocketRoutedSession webSocketRoutedSession = getRoutedSession(session);
if (webSocketRoutedSession != null) {
webSocketRoutedSession.close(status);
}
routedSessions.remove(session.getId());
}
catch (NullPointerException | IOException e) {
log.debug("Error closing WebSocket connection: {}", e.getMessage(), e);
}
}
@Test
public void connectFailure() throws Exception {
final HttpServerErrorException expected = new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR);
RestOperations restTemplate = mock(RestOperations.class);
given(restTemplate.execute((URI) any(), eq(HttpMethod.POST), any(), any())).willThrow(expected);
final CountDownLatch latch = new CountDownLatch(1);
connect(restTemplate).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
if (ex == expected) {
latch.countDown();
}
}
}
);
verifyNoMoreInteractions(this.webSocketHandler);
}
/**
* 建立连接后,把登录用户的id写入WebSocketSession
*/
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
Integer uid = (Integer) session.getAttributes().get("uid");
User u=userService.getUserById(uid);
if (userSocketSessionMap.get(uid) == null) {
userSocketSessionMap.put(uid, session);
Message msg = new Message();
msg.setFrom(0);//0表示上线消息
msg.setText(u.getUsername());
msg.setUserId(u.getId());
msg.setAvatar(u.getAvatar());
msg.setEmail(u.getEmail());
this.broadcast(new TextMessage(new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create().toJson(msg)));
}
}
public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
String payload = message.getPayload();
if (!StringUtils.hasLength(payload)) {
return;
}
String[] messages;
try {
messages = getSockJsServiceConfig().getMessageCodec().decode(payload);
}
catch (Throwable ex) {
logger.error("Broken data received. Terminating WebSocket connection abruptly", ex);
tryCloseWithSockJsTransportError(ex, CloseStatus.BAD_DATA);
return;
}
if (messages != null) {
delegateMessages(messages);
}
}
@HiveWebsocketAuth
@PreAuthorize("isAuthenticated() and hasPermission(null, 'MANAGE_DEVICE_TYPE')")
public void processUserAssignDeviceType(JsonObject request, WebSocketSession session) {
Long userId = gson.fromJson(request.get(USER_ID), Long.class);
if (userId == null) {
logger.error(Messages.USER_ID_REQUIRED);
throw new HiveException(Messages.USER_ID_REQUIRED, BAD_REQUEST.getStatusCode());
}
Long deviceTypeId = gson.fromJson(request.get(DEVICE_TYPE_ID), Long.class);
if (deviceTypeId == null) {
logger.error(Messages.DEVICE_TYPE_ID_REQUIRED);
throw new HiveException(Messages.DEVICE_TYPE_ID_REQUIRED, BAD_REQUEST.getStatusCode());
}
userService.assignDeviceType(userId, deviceTypeId);
clientHandler.sendMessage(request, new WebSocketResponse(), session);
}
public void sendMsg(WsForm wsForm) {
try {
if (wsForm == null) {
return;
}
TextMessage message = new TextMessage(JSON.toJSONString(wsForm));
for (Entry<String, WebSocketSession> entry : wcContent.entrySet()) {
WebSocketSession session = entry.getValue();
if (session.isOpen()) {
synchronized (session) {
session.sendMessage(message);
}
}
}
} catch (Exception e) {
LOGGER.warn("sendMsg", e);
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
try {
getClient(session, null);
if (this.terminalClient != null) {
if (!terminalClient.isClosed()) {
terminalClient.write(message.getPayload());
} else {
session.close();
}
}
} catch (Exception e) {
session.sendMessage(new TextMessage("Sorry! jobx Terminal was closed, please try again. "));
terminalClient.disconnect();
session.close();
}
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
logger.info("handleTextMessage start");
// 将消息进行转化,因为是消息是json数据,可能里面包含了发送给某个人的信息,所以需要用json相关的工具类处理之后再封装成TextMessage,
// 我这儿并没有做处理,消息的封装格式一般有{from:xxxx,to:xxxxx,msg:xxxxx},来自哪里,发送给谁,什么消息等等
String msg = message.getPayload();
logger.info("msg = " + msg);
WsParam<String> wsParam = JacksonUtil.json2Bean(msg, new TypeReference<WsParam<String>>(){});
if ("list".equals(wsParam.getMethod())) {
logger.info("call list method...");
WsResponse<String> response = new WsResponse<>();
response.setResult("hello list");
sendMessageToUser(session, new TextMessage(JacksonUtil.bean2Json(response)));
}
logger.info("handleTextMessage end");
// 给所有用户群发消息
//sendMessagesToUsers(msg);
// 给指定用户群发消息
//sendMessageToUser(userId, msg);
}
public void close(WebSocketSession webSocketSession) {
Replicator replicator = replicatorMap.remove(webSocketSession.getId());
if (replicator != null) {
try {
replicator.close();
webSocketSession.close();
} catch (IOException e) {
logger.warn(e.getMessage());
}
}
RxClient rxClient = rxClientMap.get(webSocketSession.getId());
if (rxClient != null) {
rxClient.shutdown();
}
observableMap.remove(webSocketSession.getId());
authMap.remove(webSocketSession.getId());
}
/**
* 添加文件监听
*
* @param file 文件
* @param session 会话
* @throws IOException 异常
*/
public static void addWatcher(File file, WebSocketSession session) throws IOException {
if (!file.exists() || file.isDirectory()) {
throw new IOException("文件不存在或者是目录:" + file.getPath());
}
ServiceFileTailWatcher<WebSocketSession> agentFileTailWatcher = CONCURRENT_HASH_MAP.computeIfAbsent(file, s -> {
try {
return new ServiceFileTailWatcher<>(file);
} catch (Exception e) {
DefaultSystemLog.getLog().error("创建文件监听失败", e);
return null;
}
});
if (agentFileTailWatcher == null) {
throw new IOException("加载文件失败:" + file.getPath());
}
agentFileTailWatcher.add(session, FileUtil.getName(file));
agentFileTailWatcher.tailWatcherRun.start();
}
@HiveWebsocketAuth
@PreAuthorize("isAuthenticated() and hasPermission(#networkId, 'GET_NETWORK')")
public void processNetworkGet(Long networkId, JsonObject request, WebSocketSession session) {
logger.debug("Network get requested.");
if (networkId == null) {
logger.error(Messages.NETWORK_ID_REQUIRED);
throw new HiveException(Messages.NETWORK_ID_REQUIRED, BAD_REQUEST.getStatusCode());
}
NetworkWithUsersAndDevicesVO existing = networkService.getWithDevices(networkId);
if (existing == null) {
logger.error(String.format(Messages.NETWORK_NOT_FOUND, networkId));
throw new HiveException(String.format(Messages.NETWORK_NOT_FOUND, networkId), NOT_FOUND.getStatusCode());
}
WebSocketResponse response = new WebSocketResponse();
response.addValue(NETWORK, existing, NETWORK_PUBLISHED);
webSocketClientHandler.sendMessage(request, response, session);
}
@HiveWebsocketAuth
@PreAuthorize("isAuthenticated() and hasPermission(null, 'MANAGE_USER')")
public void processUserInsert(JsonObject request, WebSocketSession session) {
UserUpdate userToCreate = gson.fromJson(request.get(USER), UserUpdate.class);
if (userToCreate == null) {
logger.error(Messages.USER_REQUIRED);
throw new HiveException(Messages.USER_REQUIRED, BAD_REQUEST.getStatusCode());
}
hiveValidator.validate(userToCreate);
String password = userToCreate.getPassword().orElse(null);
UserVO created = userService.createUser(userToCreate.convertTo(), password);
WebSocketResponse response = new WebSocketResponse();
response.addValue(USER, created, USER_SUBMITTED);
clientHandler.sendMessage(request, response, session);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws java.io.IOException {
ObjectMapper mapper = new ObjectMapper();
String event = mapper.writeValueAsString(TestMessage.getTestMessage());
int half = event.length() / 2;
String part1 = event.substring(0, half);
String part2 = event.substring(half);
session.sendMessage(new BinaryMessage(part1.getBytes(UTF_8), false));
session.sendMessage(new BinaryMessage(part2.getBytes(UTF_8), true));
}
private void sendPingMessage(WebSocketSession session, TextMessage pingMessage) {
try {
webSocketFlushExecutor.execute(new OrderedWebSocketFlushRunnable(session, pingMessage, true));
} catch (RuntimeException e) {
logger.warn("failed while to execute. error:{}.", e.getMessage(), e);
}
}
/**
* Starts the ping task. Note: It only gets called on HELLO event type.
*
* @param session
*/
private void pingAtRegularIntervals(WebSocketSession session) {
pingTask = new PingTask(session);
if (pingScheduledExecutorService != null) {
pingScheduledExecutorService.shutdownNow();
}
pingScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
pingScheduledExecutorService.scheduleAtFixedRate(pingTask, 1L, 30L, TimeUnit.SECONDS);
}
@Override
public synchronized void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
terminalToken = terminalContext.getToken();
if (terminalToken != null) {
final Terminal terminal = terminalContext.remove(terminalToken);
if (terminal != null) {
try {
session.sendMessage(new TextMessage("Welcome to jobx Terminal! Connect Starting."));
getClient(session, terminal);
int cols = toInt(session.getAttributes().get("cols").toString());
int rows = toInt(session.getAttributes().get("rows").toString());
int width = toInt(session.getAttributes().get("width").toString());
int height = toInt(session.getAttributes().get("height").toString());
terminalClient.openTerminal(cols, rows, width, height);
terminalService.login(terminal.getId());
} catch (Exception e) {
if (e.getLocalizedMessage().replaceAll("\\s+", "").contentEquals("Operationtimedout")) {
session.sendMessage(new TextMessage("Sorry! Connect timed out, please try again. "));
} else {
session.sendMessage(new TextMessage("Sorry! Operation error, please try again. "));
}
terminalClient.disconnect();
session.close();
}
} else {
this.terminalClient.disconnect();
session.sendMessage(new TextMessage("Sorry! Connect failed, please try again. "));
session.close();
}
}
}
@Override
public void afterConnectionClosed(final WebSocketSession session,
CloseStatus status) throws Exception
{
log.debug("[Handler::afterConnectionClosed] status: {}, sessionId: {}",
status, session.getId());
stop(session);
}
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "No StompHeaderAccessor available");
byte[] payload = message.getPayload();
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
boolean useBinary = (payload.length > 0 &&
!(SockJsSession.class.isAssignableFrom(sessionType)) &&
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType()));
return (useBinary ? new BinaryMessage(bytes) : new TextMessage(bytes));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
RoomParticipant user = (RoomParticipant) session.getAttributes().get(USER);
if (user != null) {
updateThreadName(user.getName() + "|wsclosed");
leaveRoom(user);
updateThreadName(HANDLER_THREAD_NAME);
}
}
private WebSocketSession appendingWebSocketSession(String url, WebSocketHttpHeaders headers, StringBuilder response, int countToNotify)
throws Exception {
StandardWebSocketClient client = new StandardWebSocketClient();
client.getUserProperties().put(SSL_CONTEXT_PROPERTY, HttpClientUtils.ignoreSslContext());
URI uri = UriComponentsBuilder.fromUriString(url).build().encode().toUri();
return client.doHandshake(appendResponseHandler(response, countToNotify), headers, uri).get(30000, TimeUnit.MILLISECONDS);
}
private void sendError(WebSocketSession session, String message) {
try {
JsonObject response = new JsonObject();
response.addProperty("id", "error");
response.addProperty("message", message);
session.sendMessage(new TextMessage(response.toString()));
} catch (IOException e) {
log.error("Exception sending message", e);
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
try {
getDelegate().handleTransportError(session, exception);
}
catch (Throwable ex) {
tryCloseWithError(session, ex, logger);
}
}
@Override
public void destroy(WebSocketSession session) {
try {
if (session.isOpen()) {
session.close();
}
} catch (IOException ignored) {
}
HandlerItem handlerItem = HANDLER_ITEM_CONCURRENT_HASH_MAP.get(session.getId());
IoUtil.close(handlerItem.inputStream);
IoUtil.close(handlerItem.outputStream);
JschUtil.close(handlerItem.channel);
JschUtil.close(handlerItem.openSession);
}
private void closeWsSession(WebSocketSession session){
try{
session.close();
}catch(Throwable t){}
if ( wsSession==session ){
clearWsSession();
}
}
private void fatalError(WebSocketSession session, Exception exception) {
try {
session.close(CloseStatus.SESSION_NOT_RELIABLE);
} catch (Exception ignored) {/*no-op*/}
cancelAll();
log.warn(String.format("WebSocket session %s (%s) closed due to an exception", session.getId(), session.getRemoteAddress()), exception);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
String username = (String) session.getAttributes().get("username");
ConcurrentWebSocketSessionDecorator sessionDecorator = new ConcurrentWebSocketSessionDecorator(session, 1000, 5120);
sessionMap.put(username, sessionDecorator);
log.info("connect succ, username:[{}]", username);
JSONObject jsonObject = new JSONObject();
jsonObject.put("action", "connect");
jsonObject.put("username", username);
sessionDecorator.sendMessage(new TextMessage(jsonObject.toJSONString()));
}
private void unsubscribeFromProxiedTarget(WebSocketSession session,
WebSocketMessageAccessor accessor) {
ProxyWebSocketConnectionManager manager = managers.get(session);
if (manager != null) {
manager.unsubscribe(accessor.getDestination());
}
}
private void closeSession(WebSocketSession session) {
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
}