下面列出了怎么用org.springframework.web.socket.WebSocketMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (shouldNotSend()) {
return;
}
this.buffer.add(message);
this.bufferSize.addAndGet(message.getPayloadLength());
do {
if (!tryFlushMessageBuffer()) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("Another send already in progress: " +
"session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), getBufferSize()));
}
checkSessionLimits();
break;
}
}
while (!this.buffer.isEmpty() && !shouldNotSend());
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
checkNativeSessionInitialized();
if (logger.isTraceEnabled()) {
logger.trace("Sending " + message + ", " + this);
}
if (message instanceof TextMessage) {
sendTextMessage((TextMessage) message);
}
else if (message instanceof BinaryMessage) {
sendBinaryMessage((BinaryMessage) message);
}
else if (message instanceof PingMessage) {
sendPingMessage((PingMessage) message);
}
else if (message instanceof PongMessage) {
sendPongMessage((PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocketMessage type: " + message);
}
}
public List<Message<byte[]>> decode(WebSocketMessage<?> webSocketMessage) {
List<Message<byte[]>> result = Collections.emptyList();
ByteBuffer byteBuffer;
if (webSocketMessage instanceof TextMessage) {
byteBuffer = ByteBuffer.wrap(((TextMessage) webSocketMessage).asBytes());
}
else if (webSocketMessage instanceof BinaryMessage) {
byteBuffer = ((BinaryMessage) webSocketMessage).getPayload();
}
else {
return result;
}
result = this.bufferingDecoder.decode(byteBuffer);
if (result.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Incomplete STOMP frame content received, bufferSize=" +
this.bufferingDecoder.getBufferSize() + ", bufferSizeLimit=" +
this.bufferingDecoder.getBufferSizeLimit() + ".");
}
}
return result;
}
@Test
public void handleMessageToClientWithUserDestination() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
headers.setMessageId("mess0");
headers.setSubscriptionId("sub0");
headers.setDestination("/queue/foo-user123");
headers.setNativeHeader(StompHeaderAccessor.ORIGINAL_DESTINATION, "/user/queue/foo");
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertTrue(((String) textMessage.getPayload()).contains("destination:/user/queue/foo\n"));
assertFalse(((String) textMessage.getPayload()).contains(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION));
}
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (shouldNotSend()) {
return;
}
this.buffer.add(message);
this.bufferSize.addAndGet(message.getPayloadLength());
do {
if (!tryFlushMessageBuffer()) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("Another send already in progress: " +
"session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), getBufferSize()));
}
checkSessionLimits();
break;
}
}
while (!this.buffer.isEmpty() && !shouldNotSend());
}
private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock()) {
try {
while (true) {
WebSocketMessage<?> message = this.buffer.poll();
if (message == null || shouldNotSend()) {
break;
}
this.bufferSize.addAndGet(-message.getPayloadLength());
this.sendStartTime = System.currentTimeMillis();
getDelegate().sendMessage(message);
this.sendStartTime = 0;
}
}
finally {
this.sendStartTime = 0;
this.flushLock.unlock();
}
return true;
}
return false;
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
checkNativeSessionInitialized();
if (logger.isTraceEnabled()) {
logger.trace("Sending " + message + ", " + this);
}
if (message instanceof TextMessage) {
sendTextMessage((TextMessage) message);
}
else if (message instanceof BinaryMessage) {
sendBinaryMessage((BinaryMessage) message);
}
else if (message instanceof PingMessage) {
sendPingMessage((PingMessage) message);
}
else if (message instanceof PongMessage) {
sendPongMessage((PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocketMessage type: " + message);
}
}
@Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException {
if (!(message instanceof TextMessage)) {
throw new IllegalArgumentException(this + " supports text messages only.");
}
if (this.state != State.OPEN) {
throw new IllegalStateException(this + " is not open: current state " + this.state);
}
String payload = ((TextMessage) message).getPayload();
payload = getMessageCodec().encode(payload);
payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
TextMessage messageToSend = new TextMessage(payload);
if (logger.isTraceEnabled()) {
logger.trace("Sending message " + messageToSend + " in " + this);
}
sendInternal(messageToSend);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception
{
NodeSession nodeSession = (NodeSession)session.getAttributes().get(NodeSession.ATTR_SESSION);
if ( nodeSession==null ){
nodeSession = nodeMgmtService.onSessionConnected(session);
}
String text = null;
Object payload = message.getPayload();
if ( payload instanceof byte[] ){
text = new String((byte[])payload, 0, message.getPayloadLength(), utf8);
}else{
text = payload.toString();
}
nodeSession.onMessage(text);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message);
}
else if (message instanceof BinaryMessage) {
handleBinaryMessage(session, (BinaryMessage) message);
}
else if (message instanceof PongMessage) {
handlePongMessage(session, (PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
}
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Handling " + message + " in " + session);
}
super.handleMessage(session, message);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
try {
getDelegate().handleMessage(session, message);
}
catch (Throwable ex) {
tryCloseWithError(session, ex, logger);
}
}
/**
* Handle an inbound message from a WebSocket client.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
WebSocketSessionHolder holder = this.sessions.get(session.getId());
if (holder != null) {
session = holder.getSession();
}
SubProtocolHandler protocolHandler = findProtocolHandler(session);
protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
if (holder != null) {
holder.setHasHandledMessages();
}
checkSessions();
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocketMessage) {
this.lastReadTime = (this.lastReadTime != -1 ? System.currentTimeMillis() : -1);
List<Message<byte[]>> messages;
try {
messages = this.codec.decode(webSocketMessage);
}
catch (Throwable ex) {
this.connectionHandler.handleFailure(ex);
return;
}
for (Message<byte[]> message : messages) {
this.connectionHandler.handleMessage(message);
}
}
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "No StompHeaderAccessor available");
byte[] payload = message.getPayload();
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
boolean useBinary = (payload.length > 0 &&
!(SockJsSession.class.isAssignableFrom(sessionType)) &&
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType()));
return (useBinary ? new BinaryMessage(bytes) : new TextMessage(bytes));
}
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
super.sendMessage(message);
if (this.nextMessageLatch != null) {
this.nextMessageLatch.get().countDown();
}
block();
}
@Test
public void handleMessageToClientWithConnectedFrame() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
}
@Test
public void handleMessageToClientWithDestinationUserNameProvider() {
this.session.setPrincipal(new UniqueUser("joe"));
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
}
@Test
public void handleMessageToClientWithHeartbeatSuppressingSockJsHeartbeat() throws IOException {
SockJsSession sockJsSession = Mockito.mock(SockJsSession.class);
given(sockJsSession.getId()).willReturn("s1");
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setHeartbeat(0, 10);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(sockJsSession, message);
verify(sockJsSession).getId();
verify(sockJsSession).getPrincipal();
verify(sockJsSession).disableHeartbeat();
verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
verifyNoMoreInteractions(sockJsSession);
sockJsSession = Mockito.mock(SockJsSession.class);
given(sockJsSession.getId()).willReturn("s1");
accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setHeartbeat(0, 0);
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.protocolHandler.handleMessageToClient(sockJsSession, message);
verify(sockJsSession).getId();
verify(sockJsSession).getPrincipal();
verify(sockJsSession).sendMessage(any(WebSocketMessage.class));
verifyNoMoreInteractions(sockJsSession);
}
@Test
public void handleMessageToClientWithBinaryWebSocketMessage() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
headers.setMessageId("mess0");
headers.setSubscriptionId("sub0");
headers.setContentType(MimeTypeUtils.APPLICATION_OCTET_STREAM);
headers.setDestination("/queue/foo");
// Non-empty payload
byte[] payload = new byte[1];
Message<byte[]> message = MessageBuilder.createMessage(payload, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> webSocketMessage = this.session.getSentMessages().get(0);
assertTrue(webSocketMessage instanceof BinaryMessage);
// Empty payload
payload = EMPTY_PAYLOAD;
message = MessageBuilder.createMessage(payload, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(2, this.session.getSentMessages().size());
webSocketMessage = this.session.getSentMessages().get(1);
assertTrue(webSocketMessage instanceof TextMessage);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message);
}
else if (message instanceof BinaryMessage) {
handleBinaryMessage(session, (BinaryMessage) message);
}
else if (message instanceof PongMessage) {
handlePongMessage(session, (PongMessage) message);
}
else {
throw new IllegalStateException("Unexpected WebSocket message type: " + message);
}
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Handling " + message + " in " + session);
}
super.handleMessage(session, message);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
try {
getDelegate().handleMessage(session, message);
}
catch (Throwable ex) {
tryCloseWithError(session, ex, logger);
}
}
/**
* Handle an inbound message from a WebSocket client.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
WebSocketSessionHolder holder = this.sessions.get(session.getId());
if (holder != null) {
session = holder.getSession();
}
SubProtocolHandler protocolHandler = findProtocolHandler(session);
protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
if (holder != null) {
holder.setHasHandledMessages();
}
checkSessions();
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocketMessage) {
this.lastReadTime = (this.lastReadTime != -1 ? System.currentTimeMillis() : -1);
List<Message<byte[]>> messages;
try {
messages = this.codec.decode(webSocketMessage);
}
catch (Throwable ex) {
this.connectionHandler.handleFailure(ex);
return;
}
for (Message<byte[]> message : messages) {
this.connectionHandler.handleMessage(message);
}
}
protected void acceptHeartbeat(final WebSocketMessage<?> message) {
try {
final HeartbeatResponse response = JacksonJsonSerializer.INSTANCE.deserialize((String) message.getPayload(), HeartbeatResponse.class);
final ResponseStatus status = response.getResponseStatus();
if (status == null) {
_heartbeatStatus.addEvent("null");
} else {
_heartbeatStatus.addEvent(status.getStatus());
}
long heartbeatTime = System.currentTimeMillis() - _heartbeatAcceptStartTime;
_heartbeatAcceptLatency.addValue(heartbeatTime);
if (ResponseStatusUtil.isServiceDown(status)) {
_sessionContext.markdown();
}
if (ResponseStatusUtil.isFail(status)) {
_logger.warn("heartbeat failed: " + status.getMessage());
} else if (ResponseStatusUtil.isPartialFail(status)) {
_logger.info("heartbeat partial failed: " + status.getMessage());
}
registerToServicesRegistry(response.getFailedInstances());
} catch (final Throwable e) {
_logger.error("handle heartbeat message failed", e);
}
}
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "No StompHeaderAccessor available");
byte[] payload = message.getPayload();
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
boolean useBinary = (payload.length > 0 &&
!(SockJsSession.class.isAssignableFrom(sessionType)) &&
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType()));
return (useBinary ? new BinaryMessage(bytes) : new TextMessage(bytes));
}
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
super.sendMessage(message);
if (this.nextMessageLatch != null) {
this.nextMessageLatch.get().countDown();
}
block();
}
@Test
public void handleMessageToClientWithConnectedFrame() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
}
@Test
public void handleMessageToClientWithDestinationUserNameProvider() {
this.session.setPrincipal(new UniqueUser("joe"));
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
WebSocketMessage<?> textMessage = this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
}