下面列出了java.net.http.WebSocket.Listener#io.undertow.websockets.core.WebSockets 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
private void registerWebSocketChannelListener(WebSocketChannel webSocketChannel) {
ChannelListener<WebSocketChannel> listener = new AbstractReceiveListener() {
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException {
log.trace("Server received full binary message");
Pooled<ByteBuffer[]> pulledData = message.getData();
try {
ByteBuffer[] resource = pulledData.getResource();
ByteBuffer byteBuffer = WebSockets.mergeBuffers(resource);
String msg = new String(byteBuffer.array());
log.trace("Sending message to decoder: {}", msg);
writeToDecoder(msg);
} finally {
pulledData.discard();
}
}
};
webSocketChannel.getReceiveSetter().set(listener);
}
private void registerWebSocketChannelListener(WebSocketChannel webSocketChannel) {
ChannelListener<WebSocketChannel> listener = new AbstractReceiveListener() {
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException {
log.log(Level.FINE, "Server received full binary message");
Pooled<ByteBuffer[]> pulledData = message.getData();
try {
ByteBuffer[] resource = pulledData.getResource();
ByteBuffer byteBuffer = WebSockets.mergeBuffers(resource);
String msg = new String(byteBuffer.array());
log.log(Level.FINE, "Sending message to decoder: "+ msg);
writeToDecoder(msg);
}
finally {
pulledData.discard();
}
}
};
webSocketChannel.getReceiveSetter().set(listener);
}
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage textMessage) {
var wrapper = (ChannelImpl) channel.getAttribute(WebSocketHandler.CHANNEL_KEY);
ActionLog actionLog = logManager.begin("=== ws message handling begin ===");
try {
actionLog.action(wrapper.action);
linkContext(channel, wrapper, actionLog);
String data = textMessage.getData();
logger.debug("[channel] message={}", data); // not mask, assume ws message not containing sensitive info, the data can be json or plain text
actionLog.track("ws", 0, 1, 0);
validateRate(wrapper);
Object message = wrapper.handler.fromClientMessage(data);
wrapper.handler.listener.onMessage(wrapper, message);
} catch (Throwable e) {
logManager.logError(e);
WebSockets.sendClose(closeCode(e), e.getMessage(), channel, ChannelCallback.INSTANCE);
} finally {
logManager.end("=== ws message handling end ===");
}
}
@Override
public <T> void send(T message) {
var watch = new StopWatch();
String text = handler.toServerMessage(message);
// refer to io.undertow.websockets.core.WebSocketChannel.send(WebSocketFrameType),
// in concurrent env, one thread can still get hold of channel from context right before channel close listener removes it from context
// this is to reduce chance of triggering WebSocketMessages.MESSAGES.channelClosed() exception
// but in theory, there is still small possibility to cause channelClosed()
if (channel.isCloseFrameSent() || channel.isCloseFrameReceived()) return;
try {
WebSockets.sendText(text, channel, ChannelCallback.INSTANCE);
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("ws", elapsed, 0, 1);
LOGGER.debug("send ws message, id={}, text={}, elapsed={}", id, text, elapsed); // not mask, assume ws message not containing sensitive info, the text can be json or plain text
}
}
HttpHandler webSocketStatusUpdateHandler() {
WebSocketConnectionCallback webSocketConnectionCallback = (exchange, webSocketChannel) -> {
Consumer<TaskStatusUpdateEvent> statusUpdateListener = event -> {
Map<String, Object> statusUpdate = new HashMap<>();
statusUpdate.put("action", "status-update");
statusUpdate.put("event", event);
ObjectMapper objectMapper = new ObjectMapper();
try {
String message = objectMapper.writeValueAsString(statusUpdate);
WebSockets.sendText(message, webSocketChannel, null);
} catch (JsonProcessingException e) {
log.error("Cannot write object to JSON", e);
String errorMessage = "Cannot write object to JSON: " + e.getMessage();
WebSockets.sendClose(CloseMessage.UNEXPECTED_ERROR, errorMessage, webSocketChannel, null);
}
};
log.debug("Registering new status update listener {}.", statusUpdateListener);
addStatusUpdateListener(statusUpdateListener);
webSocketChannel.addCloseTask((task) -> removeStatusUpdateListener(statusUpdateListener));
};
return new WebSocketProtocolHandshakeHandler(webSocketConnectionCallback);
}
private void registerWebSocketChannelListener(WebSocketChannel webSocketChannel) {
ChannelListener<WebSocketChannel> listener = new AbstractReceiveListener() {
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException {
log.trace("Server received full binary message");
Pooled<ByteBuffer[]> pulledData = message.getData();
try {
ByteBuffer[] resource = pulledData.getResource();
ByteBuffer byteBuffer = WebSockets.mergeBuffers(resource);
String msg = new String(byteBuffer.array());
log.trace("Sending message to decoder: {}", msg);
writeToDecoder(msg);
} finally {
pulledData.discard();
}
}
};
webSocketChannel.getReceiveSetter().set(listener);
}
@Override
public Mono<Void> close(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
if (!getDelegate().isCloseFrameSent()) {
WebSockets.sendClose(cm, getDelegate(), null);
}
return Mono.empty();
}
@Override
public Mono<Void> close(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
if (!getDelegate().isCloseFrameSent()) {
WebSockets.sendClose(cm, getDelegate(), null);
}
return Mono.empty();
}
@Override
public void run() {
try {
Thread.sleep(WAIT_FOR_MORE_DELAY);
} catch (InterruptedException ignored) {
}
synchronized (tasks) {
tasks.remove(user);
if (tasks.isEmpty()) PreparingShutdown.get().allEventsSent();
}
ArrayList<QueuedMessage> toSend = new ArrayList<>(MAX_MESSAGES_PER_POLL);
synchronized (messages) {
messages.drainTo(toSend, MAX_MESSAGES_PER_POLL);
}
JsonArray array = new JsonArray(toSend.size());
for (QueuedMessage message : toSend)
array.add(message.getData().obj());
for (WebSocketChannel channel : channels.toArray(new WebSocketChannel[0])) {
try {
WebSockets.sendTextBlocking(new JsonWrapper(Consts.GeneralKeys.EVENTS, array).obj().toString(), channel);
user.userReceivedEvents();
} catch (IOException ex) {
handleIoException(ex, channel);
}
}
}
@Override
public void close() {
var watch = new StopWatch();
try {
WebSockets.sendClose(WebSocketCloseCodes.NORMAL_CLOSURE, null, channel, ChannelCallback.INSTANCE);
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("ws", elapsed, 0, 1);
LOGGER.debug("close ws channel, id={}, elapsed={}", id, elapsed);
}
}
@Test
public void testSendData() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocket";
final String data = UUID.randomUUID().toString();
eventData = null;
// when
Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
eventData = message.toString();
return null;
}
};
httpClient.newWebSocketBuilder().buildAsync(new URI(url), listener).get();
Application.getInstance(WebSocketService.class).getChannels("/websocket").forEach(channel -> {
try {
if (channel.isOpen()) {
WebSockets.sendTextBlocking(data, channel);
}
} catch (final IOException e) {
e.printStackTrace();
}
});
//then
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, equalTo(data)));
}
@Test
public void testSendDataWithInvalidAuthentication() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocketauth";
final String data = UUID.randomUUID().toString();
eventData = null;
PasetoV1LocalBuilder token = Pasetos.V1.LOCAL.builder().setSubject("foo")
.setExpiration(LocalDateTime.now().plusHours(1).atZone(ZoneId.systemDefault()).toInstant())
.claim(ClaimKey.TWO_FACTOR.toString(), "false")
.setSharedSecret(new SecretKeySpec("oskdlwsodkcmansjdkwsowekd5jfvsq2mckdkalsodkskajsfdsfdsfvvkdkcskdsqidsjk".getBytes(StandardCharsets.UTF_8), "AES"));
Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
eventData = message.toString();
return null;
}
};
httpClient.newWebSocketBuilder()
.header("Cookie", config.getAuthenticationCookieName() + "=" + token.compact())
.buildAsync(new URI(url), listener);
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, not(equalTo(data))));
Application.getInstance(WebSocketService.class).getChannels("/websocketauth").forEach(channel -> {
try {
if (channel.isOpen()) {
WebSockets.sendTextBlocking(data, channel);
}
} catch (final IOException e) {
e.printStackTrace();
}
});
// then
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, not(equalTo(data))));
}
@Override
public void sendError(int sc) throws IOException {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "sendError{0}", sc);
}
responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
}
@Override
public void sendError(int sc, String msg) throws IOException {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
}
responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
}
@Override
public WebSocketConnection sendMessage(String message) throws IOException {
checkClosed();
WebSockets.sendText(message, channel, null);
return this;
}
@Override
public WebSocketConnection sendMessage(byte[] message) throws IOException {
checkClosed();
ByteBuffer buffer = ByteBuffer.wrap(message, 0, message.length);
WebSockets.sendBinary(buffer, channel, null);
return this;
}
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException {
Pooled<ByteBuffer[]> pulledData = message.getData();
try {
ByteBuffer[] resource = pulledData.getResource();
ByteBuffer buffer = WebSockets.mergeBuffers(resource);
handler.onMessage(context, buffer.array());
} finally {
pulledData.discard();
}
}
private static AbstractReceiveListener getListener() {
return new AbstractReceiveListener() {
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
final String messageData = message.getData();
for (WebSocketChannel session : channel.getPeerConnections()) {
WebSockets.sendText(messageData, session, null);
}
}
};
}
private void sendBinary(byte[] buffer, WebSocketChannel webSocketChannel) {
WebSockets.sendBinary(ByteBuffer.wrap(buffer), webSocketChannel, null);
}
private void send(WebSocketChannel channel, String type, String data) {
WebSockets.sendText(EventMessage.of(type, data).toJson(), channel, null);
}
private static void sendConnectionError(WebSocketHttpExchange exchange, WebSocketChannel channel, JsonWrapper error) {
WebSockets.sendText(error.toString(), channel, null);
exchange.endExchange();
}
private void sendBinary(byte[] buffer, WebSocketChannel webSocketChannel) {
WebSockets.sendBinary(ByteBuffer.wrap(buffer), webSocketChannel, null);
}
public void shutdown() {
for (var channel : channels) {
WebSockets.sendClose(WebSocketCloseCodes.SERVICE_RESTART, "server is shutting down", channel, ChannelCallback.INSTANCE);
}
}
@Test
public void testSendDataWithValidAuthentication() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocketauth";
final String data = UUID.randomUUID().toString();
eventData = null;
// then
PasetoV1LocalBuilder token = Pasetos.V1.LOCAL.builder().setSubject("foo")
.setExpiration(LocalDateTime.now().plusHours(1).atZone(ZoneId.systemDefault()).toInstant())
.claim(ClaimKey.TWO_FACTOR.toString(), "false")
.setSharedSecret(new SecretKeySpec(config.getAuthenticationCookieSecret().getBytes(StandardCharsets.UTF_8), "AES"));
Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
eventData = message.toString();
return null;
}
};
httpClient.newWebSocketBuilder()
.header("Cookie", config.getAuthenticationCookieName() + "=" + token.compact())
.buildAsync(new URI(url), listener);
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, not(equalTo(data))));
Application.getInstance(WebSocketService.class).getChannels("/websocketauth").forEach(channel -> {
try {
if (channel.isOpen()) {
WebSockets.sendTextBlocking(data, channel);
}
} catch (final IOException e) {
e.printStackTrace();
}
});
// then
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, equalTo(data)));
}
private void sendBinary(byte[] buffer, WebSocketChannel webSocketChannel) {
WebSockets.sendBinary(ByteBuffer.wrap(buffer), webSocketChannel, null);
}
@Override
public void send(String message) {
WebSockets.sendText(message, channel, null);
}
@Override
public void close(int code, String reason) {
WebSockets.sendClose(code, reason, channel, null);
}