下面列出了怎么用 io.netty.handler.codec.http.websocketx.TextWebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
//删除之前 HttpRequestHandle
ctx.pipeline().remove(HttpRequestHandle.class) ;
//通知所有已经连接的客户端 有新的连接来了
group.writeAndFlush(new TextWebSocketFrame("新的客户端=" + ctx.channel() + "连接上来了")) ;
//将当前的 channel 也就是 websocket channel 加入到 channelGroup 当中
group.add(ctx.channel()) ;
}else {
//交给下一个 channelHandler 处理
super.userEventTriggered(ctx, evt);
}
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
public void handle(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
onClose(ctx);
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
String msg = ((TextWebSocketFrame) frame).text();
onMessage(ctx, msg);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private byte[] decodeWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return null;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return null;
}
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
return parseMessage(textFrame.content());
}
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
return parseMessage(binFrame.content());
}
logger.warn("Message format error: " + frame);
return null;
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
/**
* 广播系统消息
*/
public static void broadCastInfo(int code, Object mess) {
try {
rwLock.readLock().lock();
Set<Channel> keySet = userInfos.keySet();
for (Channel ch : keySet) {
UserInfo userInfo = userInfos.get(ch);
if (userInfo == null || !userInfo.isAuth()) {
continue;
}
ch.writeAndFlush(new TextWebSocketFrame(ChatProto.buildSystProto(code, mess)));
}
} finally {
rwLock.readLock().unlock();
}
}
public void pullThread() {
new Thread(() -> {
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserInfoManager.rwLock.readLock().lock();
for (Channel channel : channels) {
UserInfo userInfo = userInfos.get(channel);
List<String> strings = CustomOfflineInfoHelper.infoMap.get(userInfo.getId());
for (String string : strings) {
channel.writeAndFlush(new TextWebSocketFrame(ChatProto.buildMessProto(userInfo.getId(), userInfo.getUsername(), string)));
}
strings.clear();
}
UserInfoManager.rwLock.readLock().unlock();
}
}).start();
}
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final TextWebSocketFrame frame, final List<Object> objects) throws Exception {
try {
// the default serializer must be a MessageTextSerializer instance to be compatible with this decoder
final MessageTextSerializer serializer = (MessageTextSerializer) select("application/json", ServerSerializers.DEFAULT_TEXT_SERIALIZER);
// it's important to re-initialize these channel attributes as they apply globally to the channel. in
// other words, the next request to this channel might not come with the same configuration and mixed
// state can carry through from one request to the next
channelHandlerContext.channel().attr(StateKey.SESSION).set(null);
channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(false);
objects.add(serializer.deserializeRequest(frame.text()));
} catch (SerializationException se) {
objects.add(RequestMessage.INVALID);
}
}
private void sendMessage(String json) {
if (!isConnected()) {
throw new IllegalStateException("Cannot send message because not connected");
}
int buffersize = deviceModel.getBuffersize();
int startPos = 0;
TextWebSocketFrame respFrame = new TextWebSocketFrame(
startPos + buffersize >= json.length(),
0,
json.substring(startPos, Math.min(json.length(), (startPos + buffersize)))
);
channel.writeAndFlush(respFrame);
startPos += buffersize;
while (startPos < json.length()) {
ContinuationWebSocketFrame contFrame = new ContinuationWebSocketFrame(
startPos + buffersize >= json.length(),
0,
json.substring(startPos, Math.min(json.length(), (startPos + buffersize)))
);
startPos += buffersize;
channel.writeAndFlush(contFrame);
}
}
private void handleMessageCompleted(ChannelHandlerContext ctx, String json) {
// parse the message
if (logger.isDebugEnabled()) {
//logger.debug(String.format("%s received %s", ctx.channel(), json));
comlog.info("Device=>Server " + ctx.channel().remoteAddress() + "=>" + ctx.channel().localAddress() + " : " + json);
}
IpcdSerializer ser = new IpcdSerializer();
ClientMessage msg = ser.parseClientMessage(new StringReader(json));
// set device on the session if the session is not yet initialized
if (!session.isInitialized()) {
session.setDevice(msg.getDevice());
SessionRegistry.putSesion(session);
}
ServerMessage response = handleClientMessage(msg);
if (response != null) {
ctx.channel().write(new TextWebSocketFrame(ser.toJson(response)));
}
}
@Override
public void sendFromServer(Channel channel, SendServerVO serverVO) {
if (serverVO.getToken() == "") {
notFindUri(channel);
}
Channel userChannel = WebSocketCacheMap.getByToken(serverVO.getToken());
if (userChannel == null) {
log.info(LogConstant.HTTPCHANNELSERVICEIMPL_NOTFINDLOGIN);
notFindToken(channel);
}
String value = fromServerService.findByCode(Integer.parseInt(serverVO.getValue()));
SendServer sendServer = new SendServer(value);
try {
userChannel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(sendServer)));
sendServer(channel, UndefinedInChatConstant.SEND_SUCCESS);
} catch (Exception e) {
log.info(LogConstant.HTTPCHANNELSERVICEIMPL_SEND_EXCEPTION);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest,处理参数
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map paramMap = getUrlParams(uri);
//如果url包含参数,需要处理
if (uri.contains("?")) {
String newUri = uri.substring(0, uri.indexOf("?"));
request.setUri(newUri);
}
Object obj = paramMap.get("token");
if (null == obj || "undefined".equals(obj)) {
ctx.channel().close();
return;
}
ChannelManager.putChannel(ChannelManager.channelLongText(ctx), ctx.channel());
} else if (msg instanceof TextWebSocketFrame) {
//正常的TEXT消息类型
// TextWebSocketFrame frame = (TextWebSocketFrame) msg;
}
super.channelRead(ctx, msg);
}
@Test
public void testWSMetrics() throws Exception {
try {
MetricsRequest request = new MetricsRequest();
ch.writeAndFlush(new TextWebSocketFrame(JsonUtil.getObjectMapper().writeValueAsString(request)));
// Confirm receipt of all data sent to this point
List<String> response = handler.getResponses();
while (response.size() == 0 && handler.isConnected()) {
LOG.info("Waiting for web socket response");
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
response = handler.getResponses();
}
Assert.assertEquals(1, response.size());
Assert.assertEquals("{\"metrics\":[]}", response.get(0));
} finally {
ch.close().sync();
s.shutdown();
group.shutdownGracefully();
}
}
@Test
public void testCreateSubscriptionWithInvalidSessionIdAndNonAnonymousAccess() throws Exception {
ctx.channel().attr(SubscriptionRegistry.SESSION_ID_ATTR)
.set(URLEncoder.encode(UUID.randomUUID().toString(), StandardCharsets.UTF_8.name()));
decoder = new WebSocketRequestDecoder(config.getSecurity());
// @formatter:off
String request = "{ "+
"\"operation\" : \"create\", " +
"\"subscriptionId\" : \"1234\"" +
" }";
// @formatter:on
TextWebSocketFrame frame = new TextWebSocketFrame();
frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
decoder.decode(ctx, frame, results);
Assert.assertNotNull(ctx.msg);
Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass());
Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode());
Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText());
}
public void doOnMessage(Channel channel, WebSocketFrame frame) {
Attribute<String> attrPath = channel.attr(PATH_KEY);
PojoMethodMapping methodMapping = null;
if (pathMethodMappingMap.size() == 1) {
methodMapping = pathMethodMappingMap.values().iterator().next();
} else {
String path = attrPath.get();
methodMapping = pathMethodMappingMap.get(path);
}
if (methodMapping.getOnMessage() != null) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
Object implement = channel.attr(POJO_KEY).get();
try {
methodMapping.getOnMessage().invoke(implement, methodMapping.getOnMessageArgs(channel, textFrame));
} catch (Throwable t) {
logger.error(t);
}
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String userName = null;
for (Map.Entry<ChannelHandlerContext, String> entry : userChannelMap.entrySet()) {
if (entry.getKey() == ctx) {
userName = entry.getValue();
userChannelMap.remove(ctx);
break;
}
}
if (userName != null) {
for (ChannelHandlerContext context : userChannelMap.keySet()) {
context.writeAndFlush(new TextWebSocketFrame("用户[" + userName + "] 离开聊天室"));
}
}
}
public void fire(final String message) {
Channel channel = this.channelRef.get();
if (channel != null) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
else if(websocket != null) {
// If the channel is null, then the client is probably in a state where it is
// connecting but the websocket isn't up yet.
// TODO: How many times do we reschedule before giving up?
eventLoopGroup.schedule(new Runnable() {
@Override
public void run() {
fire(message);
}
}, 1, TimeUnit.SECONDS);
}
else {
throw new IllegalStateException("Client is closed, can't send message");
}
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
@Test
public void testAddSubscription() throws Exception {
decoder = new WebSocketRequestDecoder(anonConfig.getSecurity());
String request = "{ \"operation\" : \"add\", \"subscriptionId\" : \"1234\" }";
TextWebSocketFrame frame = new TextWebSocketFrame();
frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
decoder.decode(ctx, frame, results);
Assert.assertEquals(1, results.size());
Assert.assertEquals(AddSubscription.class, results.get(0).getClass());
AddSubscription add = (AddSubscription) results.iterator().next();
add.validate();
}
private void agentRegister(ChannelHandlerContext ctx, String requestUri) throws URISyntaxException {
// generate a random agent id
String id = RandomStringUtils.random(20, true, true).toUpperCase();
QueryStringDecoder queryDecoder = new QueryStringDecoder(requestUri);
List<String> idList = queryDecoder.parameters().get("id");
if (idList != null && !idList.isEmpty()) {
id = idList.get(0);
}
final String finalId = id;
URI responseUri = new URI("response", null, "/", "method=agentRegister" + "&id=" + id, null);
AgentInfo info = new AgentInfo();
SocketAddress remoteAddress = ctx.channel().remoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
info.setHost(inetSocketAddress.getHostString());
info.setPort(inetSocketAddress.getPort());
}
info.setChannelHandlerContext(ctx);
tunnelServer.addAgent(id, info);
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
tunnelServer.removeAgent(finalId);
}
});
ctx.channel().writeAndFlush(new TextWebSocketFrame(responseUri.toString()));
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
} else if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
exec(ctx, frame);
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
final Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
// web socket client connected
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
final FullHttpResponse response = (FullHttpResponse) msg;
throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
}
// a close frame doesn't mean much here. errors raised from closed channels will mark the host as dead
final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
} else if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame());
}else if (frame instanceof PongWebSocketFrame) {
logger.debug("Received response from keep-alive request");
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.fireChannelRead(frame.retain(2));
} else if (frame instanceof CloseWebSocketFrame)
ch.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
logger.debug("WebSocket Client connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
logger.debug("WebSocket Client received text message: " + textFrame.text());
textReceived = textFrame.text();
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
bufferReceived = binaryFrame.content().nioBuffer();
logger.debug("WebSocket Client received binary message: " + bufferReceived.toString());
} else if (frame instanceof PongWebSocketFrame) {
logger.debug("WebSocket Client received pong");
PongWebSocketFrame pongFrame = (PongWebSocketFrame) frame;
bufferReceived = pongFrame.content().nioBuffer();
} else if (frame instanceof CloseWebSocketFrame) {
logger.debug("WebSocket Client received closing");
ch.close();
}
}
public static void sendMessage(Channel channel, ByteBuf message) {
// Log.debug("发送消息:{}",message);
try {
channel.eventLoop().execute(() -> {
ChannelFuture cf = channel.writeAndFlush(new TextWebSocketFrame(message));
cf.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
Log.error(future.cause());
}
});
});
} catch (Exception e) {
Log.error(e);
}
}
@Override
public void sendByInChat(Channel channel, SendInChat sendInChat) {
Gson gson = new Gson();
Channel other = WebSocketCacheMap.getByToken(sendInChat.getToken());
try {
other.writeAndFlush(new TextWebSocketFrame(gson.toJson(sendInChat.getFrame())));
} catch (NullPointerException e) {
e.printStackTrace();
}
// FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
// response.headers().set(HttpConstant.CONTENT_TYPE,HttpConstant.APPLICATION_JSON);
// channel.writeAndFlush(response);
close(channel);
}
@Override
public void sendText(final String text) throws IOException {
if (text == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
assertNotInFragment();
try {
undertowSession.getChannel().writeAndFlush(new TextWebSocketFrame(text)).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
ChatUser chatUser = manager.getChatUser(ctx.channel());
if (chatUser!=null&&chatUser.isAuth()){
QuarkClientProtocol clientProto = JSON.parseObject(frame.text(), new TypeReference<QuarkClientProtocol>(){});
//广播消息
manager.broadMessage(QuarkChatProtocol.buildMessageCode(chatUser.getUser(),clientProto.getMsg()));
}
}
@Test
public void testDynamicAnnotatedEndpoint() throws Exception {
final byte[] payload = "hello".getBytes();
final CompletableFuture<?> latch = new CompletableFuture();
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/ws/dynamicEchoEndpoint?annotated=true"));
client.connect();
client.send(new TextWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(TextWebSocketFrame.class, "opened:true /dynamicEchoEndpoint hello".getBytes(), latch));
latch.get();
client.destroy();
}