下面列出了javax.websocket.Session#addMessageHandler ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @Override onOpen websocket connect
*
* @param Session
* @param EndpointConfig
*/
@Override
public void onOpen(Session session, EndpointConfig config) {
session.setMaxBinaryMessageBufferSize(8388608);
session.setMaxTextMessageBufferSize(8388608);
MessageHandler handler = new MessageHandler.Whole<String>() {
@Override
public void onMessage(String text) {
try {
onMessageHandler(text, session);
} catch (Exception e) {
_log.error(e);
}
}
};
session.addMessageHandler(handler);
}
/**
* Callback hook for Connection open events.
*
* @param userSession the userSession which is opened.
*/
@Override
public void onOpen( Session userSession, EndpointConfig endpointConfig ) {
this.userSession = userSession;
this.userSession.setMaxTextMessageBufferSize( MAX_TXT_MSG_BUF_SIZE );
this.userSession.setMaxBinaryMessageBufferSize( MAX_BIN_MSG_BUF_SIZE );
userSession.addMessageHandler( new MessageHandler.Whole<Message>() {
/**
* Callback hook for Message Events. This method will be invoked when the server send a message.
*
* @param message The text message
*/
@Override
public void onMessage( Message message ) {
try {
messageEventService.fireEvent( message );
} catch ( MessageEventFireEventException e ) {
throw new RuntimeException( e );
}
}
} );
}
@Override
public void onOpen(Session session, EndpointConfig ec) {
// If the client instance is disposed, then immediately close all opened Sessions
if(LibertyClientInstance.getInstance().isDisposed()) {
log.interesting("Ignoring onOpen on an endpoint with a closed LibertyClientInstance", clientState.getLogContext());
try { session.close(); } catch (IOException e) { /*ignore*/ }
return;
}
log.interesting("Websocket session "+session.getId()+" opened with client instance "+LibertyClientInstance.getInstance().getUuid(),
clientState.getLogContext());
session.setMaxBinaryMessageBufferSize(128 * 1024);
session.addMessageHandler(new BinaryMessageHandler(this, session, sessionWrapper));
// session.addMessageHandler(new StringMessageHandler(this, session));
sessionWrapper.newSession(session);
ResourceLifecycleUtil.getInstance().addNewSession(ClientUtil.convertSessionToManagedResource(session));
LibertyClientInstance.getInstance().add(session);
}
@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(String.class, message -> {
ctx.writeAndFlush(new TextWebSocketFrame(message));
LOG.debug("Message received on Websocket session {}: {}", session.getId(), message);
});
}
@Override
public void onOpen(Session session, EndpointConfig config) {
LOGGER.log(Level.FINE, "Client received open.");
this.session = session;
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
LOGGER.log(Level.FINEST, "Client received text MESSAGE: {}", message);
if (onStringMessageConsumer != null) {
onStringMessageConsumer.accept(message);
}
}
});
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] bytes) {
LOGGER.log(Level.FINEST, "Client received binary MESSAGE: {}", new String(bytes));
if (onBinaryMessageConsumer != null) {
onBinaryMessageConsumer.accept(bytes);
}
}
});
if (onOpenConsumer != null) {
onOpenConsumer.accept(session);
}
}
@org.junit.Test
public void testTextByFuture() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Future<Void>> sendResult = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
sendResult.set(session.getAsyncRemote().sendText(message));
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new TextWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(TextWebSocketFrame.class, payload, latch));
latch.get();
sendResult.get();
client.destroy();
}
@Override
public void onOpen(Session session, EndpointConfig config) {
log.debug("Client received open.");
this.session = session;
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
log.trace("Client received text MESSAGE: {}", message);
if (onStringMessageConsumer != null) {
onStringMessageConsumer.accept(message);
}
}
});
session.addMessageHandler(new MessageHandler.Whole<byte[]>() {
@Override
public void onMessage(byte[] bytes) {
log.trace("Client received binary MESSAGE: {}", new String(bytes));
if (onBinaryMessageConsumer != null) {
onBinaryMessageConsumer.accept(bytes);
}
}
});
if (onOpenConsumer != null) {
onOpenConsumer.accept(session);
}
}
@Test
public void testAtmosphereWebsocketComponent() throws Exception {
SimpleMessageHandler handler = new SimpleMessageHandler(new CountDownLatch(1));
WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
Session session = webSocketContainer.connectToServer(HelloClient.class, new URI("ws://localhost:8080/camel-atmosphere-websocket-tests/services/hello"));
session.addMessageHandler(handler);
session.getBasicRemote().sendText("Kermit");
Assert.assertTrue("Gave up waiting for web socket response", handler.awaitMessage());
Assert.assertEquals("Hello Kermit", handler.getResult());
}
private void doTestPerMessageDefalteClient(String msg, int count) throws Exception {
Tomcat tomcat = getTomcatInstance();
// Must have a real docBase - just use temp
Context ctx =
tomcat.addContext("", System.getProperty("java.io.tmpdir"));
ctx.addApplicationListener(TesterEchoServer.Config.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMapping("/", "default");
tomcat.start();
Extension perMessageDeflate = new WsExtension(PerMessageDeflate.NAME);
List<Extension> extensions = new ArrayList<Extension>(1);
extensions.add(perMessageDeflate);
ClientEndpointConfig clientConfig =
ClientEndpointConfig.Builder.create().extensions(extensions).build();
WebSocketContainer wsContainer =
ContainerProvider.getWebSocketContainer();
Session wsSession = wsContainer.connectToServer(
TesterProgrammaticEndpoint.class,
clientConfig,
new URI("ws://" + getHostName() + ":" + getPort() +
TesterEchoServer.Config.PATH_ASYNC));
CountDownLatch latch = new CountDownLatch(count);
BasicText handler = new BasicText(latch, msg);
wsSession.addMessageHandler(handler);
for (int i = 0; i < count; i++) {
wsSession.getBasicRemote().sendText(msg);
}
boolean latchResult = handler.getLatch().await(10, TimeUnit.SECONDS);
Assert.assertTrue(latchResult);
((WsWebSocketContainer) wsContainer).destroy();
}
@Override
public void onOpen(Session session, EndpointConfig config) {
LOG.info("Websocket session {} opened.", session.getId());
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
LOG.info("Message received on Websocket session {}: {}", session.getId(), message);
}
});
}
@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(new MsgStringMessageHandler(session));
}
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
RemoteEndpoint.Basic remoteEndpointBasic = session.getBasicRemote();
session.addMessageHandler(new EchoMessageHandlerText(remoteEndpointBasic));
session.addMessageHandler(new EchoMessageHandlerBinary(remoteEndpointBasic));
}
@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(new MsgStringMessageHandler(session));
}
@Test
public void testConnectToServerEndpointSSL() throws Exception {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMapping("/", "default");
TesterSupport.initSsl(tomcat);
tomcat.start();
WebSocketContainer wsContainer =
ContainerProvider.getWebSocketContainer();
ClientEndpointConfig clientEndpointConfig =
ClientEndpointConfig.Builder.create().build();
URL truststoreUrl = this.getClass().getClassLoader().getResource(
"org/apache/tomcat/util/net/ca.jks");
File truststoreFile = new File(truststoreUrl.toURI());
clientEndpointConfig.getUserProperties().put(
WsWebSocketContainer.SSL_TRUSTSTORE_PROPERTY,
truststoreFile.getAbsolutePath());
Session wsSession = wsContainer.connectToServer(
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("wss://localhost:" + getPort() +
TesterFirehoseServer.Config.PATH));
CountDownLatch latch =
new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
BasicText handler = new BasicText(latch);
wsSession.addMessageHandler(handler);
wsSession.getBasicRemote().sendText("Hello");
System.out.println("Sent Hello message, waiting for data");
// Ignore the latch result as the message count test below will tell us
// if the right number of messages arrived
handler.getLatch().await(TesterFirehoseServer.WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS);
Queue<String> messages = handler.getMessages();
Assert.assertEquals(
TesterFirehoseServer.MESSAGE_COUNT, messages.size());
for (String message : messages) {
Assert.assertEquals(TesterFirehoseServer.MESSAGE, message);
}
}
@Test
public void testConnectToServerEndpoint() throws Exception {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMappingDecoded("/", "default");
TesterSupport.initSsl(tomcat);
tomcat.start();
WebSocketContainer wsContainer =
ContainerProvider.getWebSocketContainer();
ClientEndpointConfig clientEndpointConfig =
ClientEndpointConfig.Builder.create().build();
clientEndpointConfig.getUserProperties().put(
Constants.SSL_CONTEXT_PROPERTY, createSSLContext());
Session wsSession = wsContainer.connectToServer(
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("wss://localhost:" + getPort() +
TesterFirehoseServer.Config.PATH));
CountDownLatch latch =
new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
BasicText handler = new BasicText(latch);
wsSession.addMessageHandler(handler);
wsSession.getBasicRemote().sendText("Hello");
System.out.println("Sent Hello message, waiting for data");
// Ignore the latch result as the message count test below will tell us
// if the right number of messages arrived
handler.getLatch().await(TesterFirehoseServer.WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS);
Queue<String> messages = handler.getMessages();
Assert.assertEquals(
TesterFirehoseServer.MESSAGE_COUNT, messages.size());
for (String message : messages) {
Assert.assertEquals(TesterFirehoseServer.MESSAGE, message);
}
}
@org.junit.Test
public void testTextUsingWriter() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(final String message) {
DefaultServer.getUndertow().getWorker().execute(new Runnable() {
@Override
public void run() {
try {
Writer writer = session.getBasicRemote().getSendWriter();
writer.write(message);
writer.close();
} catch (IOException e) {
e.printStackTrace();
cause.set(e);
latch.completeExceptionally(e);
}
}
});
}
});
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new TextWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(TextWebSocketFrame.class, payload, latch));
latch.get();
Assert.assertNull(cause.get());
client.destroy();
}
@Test
public void testBug56032() throws Exception {
// TODO Investigate options to get this test to pass with the HTTP BIO
// connector.
Assume.assumeFalse(
"Skip this test on BIO. TODO: investigate options to make it pass with HTTP BIO connector",
getTomcatInstance().getConnector().getProtocolHandlerClassName().equals(
"org.apache.coyote.http11.Http11Protocol"));
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMapping("/", "default");
TesterSupport.initSsl(tomcat);
tomcat.start();
WebSocketContainer wsContainer =
ContainerProvider.getWebSocketContainer();
ClientEndpointConfig clientEndpointConfig =
ClientEndpointConfig.Builder.create().build();
clientEndpointConfig.getUserProperties().put(
WsWebSocketContainer.SSL_TRUSTSTORE_PROPERTY,
"test/org/apache/tomcat/util/net/ca.jks");
Session wsSession = wsContainer.connectToServer(
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("wss://localhost:" + getPort() +
TesterFirehoseServer.Config.PATH));
// Process incoming messages very slowly
MessageHandler handler = new SleepingText(5000);
wsSession.addMessageHandler(handler);
wsSession.getBasicRemote().sendText("Hello");
// Wait long enough for the buffers to fill and the send to timeout
int count = 0;
int limit = TesterFirehoseServer.WAIT_TIME_MILLIS / 100;
System.err.println("Waiting for server to report an error");
while (TesterFirehoseServer.Endpoint.getErrorCount() == 0 && count < limit) {
Thread.sleep(100);
count ++;
}
if (TesterFirehoseServer.Endpoint.getErrorCount() == 0) {
Assert.fail("No error reported by Endpoint when timeout was expected");
}
// Wait up to another 20 seconds for the connection to be closed
System.err.println("Waiting for connection to be closed");
count = 0;
limit = (TesterFirehoseServer.SEND_TIME_OUT_MILLIS * 4) / 100;
while (TesterFirehoseServer.Endpoint.getOpenConnectionCount() != 0 && count < limit) {
Thread.sleep(100);
count ++;
}
int openConnectionCount = TesterFirehoseServer.Endpoint.getOpenConnectionCount();
if (openConnectionCount != 0) {
Assert.fail("There are [" + openConnectionCount + "] connections still open");
}
}
@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(this);
}
@Test
public void testConnectToServerEndpoint() throws Exception {
Tomcat tomcat = getTomcatInstance();
// No file system docBase required
Context ctx = tomcat.addContext("", null);
ctx.addApplicationListener(TesterFirehoseServer.Config.class.getName());
Tomcat.addServlet(ctx, "default", new DefaultServlet());
ctx.addServletMapping("/", "default");
tomcat.start();
WebSocketContainer wsContainer =
ContainerProvider.getWebSocketContainer();
ClientEndpointConfig clientEndpointConfig =
ClientEndpointConfig.Builder.create().build();
Session wsSession = wsContainer.connectToServer(
TesterProgrammaticEndpoint.class,
clientEndpointConfig,
new URI("ws://localhost:" + getPort() +
TesterFirehoseServer.Config.PATH));
CountDownLatch latch =
new CountDownLatch(TesterFirehoseServer.MESSAGE_COUNT);
BasicText handler = new BasicText(latch);
wsSession.addMessageHandler(handler);
wsSession.getBasicRemote().sendText("Hello");
System.out.println("Sent Hello message, waiting for data");
// Ignore the latch result as the message count test below will tell us
// if the right number of messages arrived
handler.getLatch().await(TesterFirehoseServer.WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS);
Queue<String> messages = handler.getMessages();
Assert.assertEquals(
TesterFirehoseServer.MESSAGE_COUNT, messages.size());
for (String message : messages) {
Assert.assertEquals(TesterFirehoseServer.MESSAGE, message);
}
}
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
RemoteEndpoint.Basic remoteEndpointBasic = session.getBasicRemote();
session.addMessageHandler(new EchoMessageHandlerText(remoteEndpointBasic));
session.addMessageHandler(new EchoMessageHandlerBinary(remoteEndpointBasic));
}