下面列出了怎么用java.net.http.WebSocket的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Sends a buffer containing a complete message to the server asynchronously
*
* @param data The message consists of bytes from the buffer's position to its limit. Upon normal
* completion the buffer will have no remaining bytes.
* @return if successful, returns a future containing the buffer that was sent upon completion
* @throws TimeoutException if the operation fails to complete in a timeout period
* @throws ExecutionException if other exceptions occurred
* @throws InterruptedException if the current thread is interrupted
* @throws IOException if an I/O error occurs or the WebSocket is not open
*/
public CompletableFuture<ByteBuffer> send(ByteBuffer data) throws Exception {
CompletableFuture<WebSocket> future;
if (null != webSocket) {
if (subprotocol.equals("text")) {
int size = data.remaining();
byte[] dst = new byte[size];
data.get(dst, 0, size);
String str = new String(dst);
future = webSocket.sendText(str, true);
} else {
future = webSocket.sendBinary(data, true);
}
return future.thenCompose(w -> CompletableFuture.completedFuture(data));
} else {
throw new IOException("WebSocket not open");
}
}
@Override
public CompletionStage<?> onText(final WebSocket webSocket, final CharSequence data, final boolean last) {
if(socket == null) {
// Socket is too quick!
socket = new ReentrantLockWebSocket(webSocket);
socketOpen = true;
}
if(last) {
try {
final var payload = socketInputBuffer.length() > 0 ? socketInputBuffer.append(data).toString() : data.toString();
handleSocketData(JsonParser.object().from(payload));
} catch(final JsonParserException e) {
catnip.logAdapter().error("Shard {}: Error parsing payload", shardInfo, e);
// TODO
stateReply(ShardConnectState.FAILED);
} finally {
socketInputBuffer.setLength(0);
}
} else {
socketInputBuffer.append(data);
}
socket.request(1L);
return null;
}
@Test
public void testCloseChannel() throws Exception {
//given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocket";
final WebSocketService webSocketService = Application.getInstance(WebSocketService.class);
webSocketService.removeChannels("/websocket");
// when
Listener listener = new Listener() {
@Override
public void onOpen(WebSocket webSocket) {
}
};
httpClient.newWebSocketBuilder().buildAsync(new URI(url), listener).join();
webSocketService.close("/websocket");
//then
assertThat(webSocketService.getChannels("/websocket"), not(nullValue()));
assertThat(webSocketService.getChannels("/websocket").size(), equalTo(0));
}
@Test
public void testWebSocketConnection() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String uri = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocket";
// when
Listener listener = new Listener() {
@Override
public void onOpen(WebSocket webSocket) {
connected = true;
}
};
httpClient.newWebSocketBuilder().buildAsync(new URI(uri), listener).join();
// then
assertThat(connected, equalTo(true));
}
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer src, boolean last) {
if (src.hasRemaining()) {
BufferSupply bufferSupply = ringBuffer.get();
bufferSupply.acquireAndCopy(src);
bufferSupply.release();
}
webSocket.request(1);
// Returning null indicates normal completion
return null;
}
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
if (message.length() > 0) {
BufferSupply bufferSupply = ringBuffer.get();
ByteBuffer buffer = bufferSupply.acquire();
String str = message.toString();
byte [] src = str.getBytes();
buffer.put(src);
bufferSupply.release();
}
webSocket.request(1);
return null;
}
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
// WebSocket implementation automatically sends close response
while (!connectedCriticalSection.compareAndSet(false, true)) {
Thread.yield();
}
try {
ClientEndpoint.this.webSocket = null;
return null;
} finally {
connectedCriticalSection.compareAndSet(true, false);
}
}
public void onError(WebSocket webSocket, Throwable error) {
// the session is already torn down; clean up the reference
while (!connectedCriticalSection.compareAndSet(false, true)) {
Thread.yield();
}
try {
ClientEndpoint.this.webSocket = null;
} finally {
connectedCriticalSection.compareAndSet(true, false);
}
}
@Override
public void close() throws Exception {
while (!connectedCriticalSection.compareAndSet(false, true)) {
Thread.yield();
}
try {
if (null != webSocket) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "").get(timeoutSeconds, TimeUnit.SECONDS);
}
} finally {
connectedCriticalSection.compareAndSet(true, false);
}
}
@Override
public void onOpen(final WebSocket webSocket) {
for(final Extension extension : catnip.extensionManager().extensions()) {
for(final CatnipHook hook : extension.hooks()) {
hook.rawGatewayOpenHook(shardInfo);
}
}
webSocket.request(1);
}
@Override
public void onError(final WebSocket webSocket, final Throwable error) {
socket = null;
socketOpen = false;
if(catnip.options().logLifecycleEvents()) {
catnip.logAdapter().error("Shard {}: Couldn't connect socket:", shardInfo, error);
}
catnip.dispatchManager().dispatchEvent(Raw.GATEWAY_WEBSOCKET_CONNECTION_FAILED,
new GatewayConnectionFailedImpl(shardInfo, error, catnip));
if(socketOpen) {
addToConnectQueue();
} else {
stateReply(ShardConnectState.FAILED);
}
}
@Override
public CompletableFuture<WebSocket> sendText(final CharSequence data, final boolean last) {
lock.lock();
try {
return webSocket.sendText(data, last).thenApply(__ -> this);
} finally {
lock.unlock();
}
}
@Override
public CompletableFuture<WebSocket> sendBinary(final ByteBuffer data, final boolean last) {
lock.lock();
try {
return webSocket.sendBinary(data, last).thenApply(__ -> this);
} finally {
lock.unlock();
}
}
@Override
public CompletableFuture<WebSocket> sendPing(final ByteBuffer message) {
lock.lock();
try {
return webSocket.sendPing(message).thenApply(__ -> this);
} finally {
lock.unlock();
}
}
@Override
public CompletableFuture<WebSocket> sendPong(final ByteBuffer message) {
lock.lock();
try {
return webSocket.sendPong(message).thenApply(__ -> this);
} finally {
lock.unlock();
}
}
@Override
public CompletableFuture<WebSocket> sendClose(final int statusCode, final String reason) {
lock.lock();
try {
return webSocket.sendClose(statusCode, reason).thenApply(__ -> this);
} finally {
lock.unlock();
}
}
@Override
public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) {
System.out.println("Method onText() got data: " + data);
if(!webSocket.isOutputClosed()) {
webSocket.sendText("Another message", true);
}
return Listener.super.onText(webSocket, data, last);
}
/** Does an async close of the current websocket connection. */
void close() {
closed = true;
// Client can be null if the connection hasn't completely opened yet.
// This null check prevents a potential NPE, which should rarely ever occur.
if (client != null && !client.isOutputClosed()) {
client
.sendClose(WebSocket.NORMAL_CLOSURE, CLIENT_DISCONNECT_MESSAGE)
.exceptionally(logWebSocketError(Level.INFO, "Failed to close"));
}
}
@Override
public void onOpen(final WebSocket webSocket) {
synchronized (queuedMessages) {
client = webSocket;
connectionIsOpen = true;
queuedMessages.forEach(
message ->
client
.sendText(message, true)
.exceptionally(logWebSocketError(Level.SEVERE, "Failed to send queued text.")));
queuedMessages.clear();
}
// Allow onText to be called at least once, WebSocketConnection is initialized
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(
final WebSocket webSocket, final CharSequence data, final boolean last) {
// No need to synchronize access, this listener is never called concurrently
// and always called in-order by the API
textAccumulator.append(data);
if (last) {
listener.messageReceived(textAccumulator.toString());
textAccumulator.setLength(0);
}
// We're done processing, allow listener to be called again at least once
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(
final WebSocket webSocket, final int statusCode, final String reason) {
pingSender.cancel();
if (reason.equals(WebSocketConnection.CLIENT_DISCONNECT_MESSAGE)) {
listener.connectionClosed();
} else {
listener.connectionTerminated(reason.isBlank() ? "Server disconnected" : reason);
}
return null;
}
@Test
void verifyListenerAccumulatesMessagesUntilLast() {
listener.onText(mock(WebSocket.class), "1", false);
listener.onText(mock(WebSocket.class), "2", false);
listener.onText(mock(WebSocket.class), "3", true);
verify(webSocketConnectionListener).messageReceived("123");
}
@Test
void verifyListenerClearsMessageCorrectly() {
listener.onText(mock(WebSocket.class), "1", false);
listener.onText(mock(WebSocket.class), "2", false);
listener.onText(mock(WebSocket.class), "3", true);
listener.onText(mock(WebSocket.class), "4", false);
listener.onText(mock(WebSocket.class), "5", true);
verify(webSocketConnectionListener).messageReceived("123");
verify(webSocketConnectionListener).messageReceived("45");
}
@Test
@DisplayName("Queued messages are flushed on connection open")
void queuedMessagesAreFlushedOnConnectionOpen() {
final WebSocket mockedWebSocket = mockWebSocket();
// not connected, this message should be queued
webSocketConnection.sendMessage(MESSAGE);
verify(mockedWebSocket, never()).sendText(any(), anyBoolean());
// onOpen should trigger message send
listener.onOpen(mockedWebSocket);
verify(mockedWebSocket).sendText(any(), anyBoolean());
}
@BeforeEach
void setUp() {
final WebSocket.Builder builder = mock(WebSocket.Builder.class);
when(builder.connectTimeout(any())).thenReturn(builder);
when(builder.buildAsync(any(), any()))
.thenReturn(CompletableFuture.completedFuture(webSocket));
when(httpClient.newWebSocketBuilder()).thenReturn(builder);
}
@Test
@DisplayName("Close will close the underlying socket and stops the pinger")
void close() {
webSocketConnection.getInternalListener().onOpen(webSocket);
when(webSocket.sendClose(anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
webSocketConnection.close();
verify(webSocket).sendClose(eq(WebSocket.NORMAL_CLOSURE), any());
assertThat(webSocketConnection.getPingSender().isRunning(), is(false));
}
@Test
public void testSendData() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocket";
final String data = UUID.randomUUID().toString();
eventData = null;
// when
Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
eventData = message.toString();
return null;
}
};
httpClient.newWebSocketBuilder().buildAsync(new URI(url), listener).get();
Application.getInstance(WebSocketService.class).getChannels("/websocket").forEach(channel -> {
try {
if (channel.isOpen()) {
WebSockets.sendTextBlocking(data, channel);
}
} catch (final IOException e) {
e.printStackTrace();
}
});
//then
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, equalTo(data)));
}
@Test
public void testSendDataWithInvalidAuthentication() throws Exception {
// given
final HttpClient httpClient = HttpClient.newHttpClient();
final Config config = Application.getInstance(Config.class);
final String url = "ws://" + config.getConnectorHttpHost() + ":" + config.getConnectorHttpPort() + "/websocketauth";
final String data = UUID.randomUUID().toString();
eventData = null;
PasetoV1LocalBuilder token = Pasetos.V1.LOCAL.builder().setSubject("foo")
.setExpiration(LocalDateTime.now().plusHours(1).atZone(ZoneId.systemDefault()).toInstant())
.claim(ClaimKey.TWO_FACTOR.toString(), "false")
.setSharedSecret(new SecretKeySpec("oskdlwsodkcmansjdkwsowekd5jfvsq2mckdkalsodkskajsfdsfdsfvvkdkcskdsqidsjk".getBytes(StandardCharsets.UTF_8), "AES"));
Listener listener = new Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, boolean last) {
eventData = message.toString();
return null;
}
};
httpClient.newWebSocketBuilder()
.header("Cookie", config.getAuthenticationCookieName() + "=" + token.compact())
.buildAsync(new URI(url), listener);
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, not(equalTo(data))));
Application.getInstance(WebSocketService.class).getChannels("/websocketauth").forEach(channel -> {
try {
if (channel.isOpen()) {
WebSockets.sendTextBlocking(data, channel);
}
} catch (final IOException e) {
e.printStackTrace();
}
});
// then
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(eventData, not(equalTo(data))));
}
public void onOpen(WebSocket webSocket) {
webSocket.request(1);
}
public ReentrantLockWebSocket(final WebSocket webSocket) {
this.webSocket = webSocket;
lock = new ReentrantLock();
}