下面列出了怎么用 io.netty.handler.codec.http.websocketx.PingWebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Print {@link WebSocketFrame} information.
*
* @param log {@link Logger} object of the relevant class
* @param frame {@link WebSocketFrame} frame
* @param channelContextId {@link ChannelHandlerContext} context id as a String
* @param customMsg Log message which needs to be appended to the frame information,
* if it is not required provide null
* @param isInbound true if the frame is inbound, false if it is outbound
*/
private static void printWebSocketFrame(Logger log, WebSocketFrame frame, String channelContextId,
String customMsg, boolean isInbound) {
String logStatement = getDirectionString(isInbound) + channelContextId;
if (frame instanceof PingWebSocketFrame) {
logStatement += " Ping frame";
} else if (frame instanceof PongWebSocketFrame) {
logStatement += " Pong frame";
} else if (frame instanceof CloseWebSocketFrame) {
logStatement += " Close frame";
} else if (frame instanceof BinaryWebSocketFrame) {
logStatement += " Binary frame";
} else if (frame instanceof TextWebSocketFrame) {
logStatement += " " + ((TextWebSocketFrame) frame).text();
}
//specifically for logging close websocket frames with error status
if (customMsg != null) {
logStatement += " " + customMsg;
}
log.debug(logStatement);
}
/**
* Print {@link WebSocketFrame} information.
*
* @param log {@link Log} object of the relevant class
* @param frame {@link WebSocketFrame} frame
* @param channelContextId {@link ChannelHandlerContext} context id as a String
* @param customMsg Log message which needs to be appended to the frame information,
* if it is not required provide null
* @param isInbound true if the frame is inbound, false if it is outbound
*/
private static void printWebSocketFrame(
Log log, WebSocketFrame frame, String channelContextId,
String customMsg, boolean isInbound) {
String logStatement = getDirectionString(isInbound) + channelContextId;
if (frame instanceof PingWebSocketFrame) {
logStatement += " Ping frame";
} else if (frame instanceof PongWebSocketFrame) {
logStatement += " Pong frame";
} else if (frame instanceof CloseWebSocketFrame) {
logStatement += " Close frame";
} else if (frame instanceof BinaryWebSocketFrame) {
logStatement += " Binary frame";
} else if (frame instanceof TextWebSocketFrame) {
logStatement += " " + ((TextWebSocketFrame) frame).text();
}
//specifically for logging close websocket frames with error status
if (customMsg != null) {
logStatement += " " + customMsg;
}
log.debug(logStatement);
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
private void processFrame(WebSocketFrame msg) throws IOException {
if (msg instanceof CloseWebSocketFrame) {
onCloseFrame((CloseWebSocketFrame) msg);
} else if (msg instanceof PongWebSocketFrame) {
onPongMessage((PongWebSocketFrame) msg);
} else if (msg instanceof PingWebSocketFrame) {
byte[] data = new byte[msg.content().readableBytes()];
msg.content().readBytes(data);
session.getAsyncRemote().sendPong(ByteBuffer.wrap(data));
} else if (msg instanceof TextWebSocketFrame) {
onText(msg, ((TextWebSocketFrame) msg).text());
} else if (msg instanceof BinaryWebSocketFrame) {
onBinary(msg);
} else if (msg instanceof ContinuationWebSocketFrame) {
if (expectedContinuation == FrameType.BYTE) {
onBinary(msg);
} else if (expectedContinuation == FrameType.TEXT) {
onText(msg, ((ContinuationWebSocketFrame) msg).text());
}
}
}
@org.junit.Test
@Ignore("UT3 - P4")
public void testPingPong() throws Exception {
final byte[] payload = "payload".getBytes();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final AtomicBoolean connected = new AtomicBoolean(false);
final CompletableFuture<?> latch = new CompletableFuture<>();
class TestEndPoint extends Endpoint {
@Override
public void onOpen(final Session session, EndpointConfig config) {
connected.set(true);
}
}
ServerWebSocketContainer builder = new ServerWebSocketContainer(TestClassIntrospector.INSTANCE, DefaultServer.getEventLoopSupplier(), Collections.EMPTY_LIST, false, false);
builder.addEndpoint(ServerEndpointConfig.Builder.create(TestEndPoint.class, "/").configurator(new InstanceConfigurator(new TestEndPoint())).build());
deployServlet(builder);
WebSocketTestClient client = new WebSocketTestClient(new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/"));
client.connect();
client.send(new PingWebSocketFrame(Unpooled.wrappedBuffer(payload)), new FrameChecker(PongWebSocketFrame.class, payload, latch));
latch.get(10, TimeUnit.SECONDS);
Assert.assertNull(cause.get());
client.destroy();
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
}
return;
}
if (frame instanceof BinaryWebSocketFrame) {
if (webSocketEvent != null) {
webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
}
}
}
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof PingWebSocketFrame) {
if (logger.isTraceEnabled()) {
logger.trace("Ping with payload [{}]", ByteBufUtil.hexDump(frame.content()));
}
PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
ctx.writeAndFlush(pong);
}
else if (frame instanceof PongWebSocketFrame) {
PingPong pingPongSession = PingPong.get(ctx.channel());
if (pingPongSession != null) {
pingPongSession.recordPong();
}
}
else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof TextWebSocketFrame ||
frame instanceof BinaryWebSocketFrame ||
frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}
public Subscription(String subscriptionId, String sessionId, DataStore store, DataStoreCache cache,
ChannelHandlerContext ctx, Configuration conf) {
this.subscriptionId = subscriptionId;
this.sessionId = sessionId;
this.store = store;
this.cache = cache;
this.ctx = ctx;
this.lag = conf.getWebsocket().getSubscriptionLag();
this.scannerBatchSize = conf.getWebsocket().getScannerBatchSize();
this.flushIntervalSeconds = conf.getWebsocket().getFlushIntervalSeconds();
this.scannerReadAhead = conf.getWebsocket().getScannerReadAhead();
this.subscriptionBatchSize = conf.getWebsocket().getSubscriptionBatchSize();
// send a websocket ping at half the timeout interval.
int rate = conf.getWebsocket().getTimeout() / 2;
this.ping = this.ctx.executor().scheduleAtFixedRate(() -> {
LOG.trace("[{}] Sending ping on channel {}", subscriptionId, ctx.channel());
ctx.writeAndFlush(new PingWebSocketFrame());
cleanupCompletedMetrics();
}, rate, rate, TimeUnit.SECONDS);
}
private byte[] decodeWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return null;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return null;
}
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
return parseMessage(textFrame.content());
}
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
return parseMessage(binFrame.content());
}
logger.warn("Message format error: " + frame);
return null;
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onInboundNext(ChannelHandlerContext ctx, Object frame) {
if (frame instanceof CloseWebSocketFrame && ((CloseWebSocketFrame) frame).isFinalFragment()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "CloseWebSocketFrame detected. Closing Websocket"));
}
CloseWebSocketFrame close = (CloseWebSocketFrame) frame;
sendCloseNow(new CloseWebSocketFrame(true,
close.rsv(),
close.content()), f -> terminate()); // terminate() will invoke onInboundComplete()
return;
}
if (!this.proxyPing && frame instanceof PingWebSocketFrame) {
//"FutureReturnValueIgnored" this is deliberate
ctx.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame) frame).content()));
ctx.read();
return;
}
if (frame != LastHttpContent.EMPTY_LAST_CONTENT) {
super.onInboundNext(ctx, frame);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
addTraceForFrame(frame, "close");
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
addTraceForFrame(frame, "ping");
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// todo [om] think about BinaryWebsocketFrame
handleTextWebSocketFrameInternal((TextWebSocketFrame) frame, ctx);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
System.err.printf("%s received %s%n", ctx.channel(), request);
ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format(
"Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
} else if (frame instanceof TextWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame, ctx.voidPromise());
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
// Ignore
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
public void handle(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
onClose(ctx);
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
String msg = ((TextWebSocketFrame) frame).text();
onMessage(ctx, msg);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
// Pass on to the rest of the channel
ctx.fireChannelRead(frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof ContinuationWebSocketFrame) {
ctx.write(frame);
} else if (frame instanceof PongWebSocketFrame) {
frame.release();
} else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
// Allow the rest of the pipeline to deal with this.
ctx.fireChannelRead(frame);
} else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
if (log.isDebugEnabled())
log.debug("Received {} WebSocketFrame: {} from channel: {}", getTransportType().getName(), msg, ctx.channel());
if (msg instanceof CloseWebSocketFrame) {
sessionIdByChannel.remove(ctx.channel());
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener(ChannelFutureListener.CLOSE);
} else if (msg instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(msg.content()));
} else if (msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame){
Packet packet = PacketDecoder.decodePacket(msg.content());
packet.setTransportType(getTransportType());
String sessionId = sessionIdByChannel.get(ctx.channel());
packet.setSessionId(sessionId);
msg.release();
ctx.fireChannelRead(packet);
} else {
msg.release();
log.warn("{} frame type is not supported", msg.getClass().getName());
}
}
private void doWSRequest(ChannelHandlerContext ctx, WebSocketFrame wsFrame) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
Command command = new Command();
command.setSiteUserId(channelSession.getUserId());
command.setClientIp(clientIp);
command.setStartTime(System.currentTimeMillis());
if (wsFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWsFrame = (TextWebSocketFrame) wsFrame;
String webText = textWsFrame.text();
try {
command.setParams(webText.getBytes(CharsetCoding.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("web message text=" + webText + " Charset code error");
}
TextWebSocketFrame resFrame = new TextWebSocketFrame(textWsFrame.text());
ctx.channel().writeAndFlush(resFrame);
executor.execute("WS-ACTION", command);
} else if (wsFrame instanceof PingWebSocketFrame) {
// ping/pong
ctx.channel().writeAndFlush(new PongWebSocketFrame(wsFrame.content().retain()));
logger.info("ws client siteUserId={} ping to server", command.getSiteUserId());
} else if (wsFrame instanceof CloseWebSocketFrame) {
// close channel
wsHandshaker.close(ctx.channel(), (CloseWebSocketFrame) wsFrame.retain());
WebChannelManager.delChannelSession(command.getSiteUserId());
}
}
/**
* Send a ping message to the server.
*
* @param buf content of the ping message to be sent.
*/
public void sendPing(ByteBuffer buf) throws IOException {
if (channel == null) {
logger.error("Channel is null. Cannot send text.");
throw new IllegalArgumentException("Cannot find the channel to write");
}
channel.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(buf)));
}
@Override
public void sendPing(final ByteBuffer applicationData) throws IOException, IllegalArgumentException {
if (applicationData == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (applicationData.remaining() > 125) {
throw JsrWebSocketMessages.MESSAGES.messageTooLarge(applicationData.remaining(), 125);
}
undertowSession.getChannel().writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(applicationData)));
}
@Override
public void sendPing(final ByteBuffer applicationData) throws IOException, IllegalArgumentException {
if (applicationData == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (applicationData.remaining() > 125) {
throw JsrWebSocketMessages.MESSAGES.messageTooLarge(applicationData.remaining(), 125);
}
undertowSession.getChannel().writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(applicationData)));
}
@Override
public void sendPing(final ByteBuffer applicationData) throws IOException, IllegalArgumentException {
if (applicationData == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (applicationData.remaining() > 125) {
throw JsrWebSocketMessages.MESSAGES.messageTooLarge(applicationData.remaining(), 125);
}
try {
undertowSession.getChannel().writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(applicationData))).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handleMessageCompleted(ctx, jsonBuffer.toString());
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
SessionRegistry.destroySession(session);
return;
}
if (frame instanceof PingWebSocketFrame) {
if (logger.isDebugEnabled())
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
jsonBuffer = new StringBuffer();
jsonBuffer.append(((TextWebSocketFrame)frame).text());
}
else if (frame instanceof ContinuationWebSocketFrame) {
if (jsonBuffer != null) {
jsonBuffer.append(((ContinuationWebSocketFrame)frame).text());
}
else {
comlog.warn("Continuation frame received without initial frame.");
}
}
else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Check if Text or Continuation Frame is final fragment and handle if needed.
if (frame.isFinalFragment()) {
handleMessageCompleted(ctx, jsonBuffer.toString());
jsonBuffer = null;
}
}
private void doWSRequest(ChannelHandlerContext ctx, WebSocketFrame wsFrame) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
Command command = new Command();
command.setSiteUserId(channelSession.getUserId());
command.setClientIp(clientIp);
command.setStartTime(System.currentTimeMillis());
if (wsFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWsFrame = (TextWebSocketFrame) wsFrame;
String webText = textWsFrame.text();
try {
command.setParams(webText.getBytes(CharsetCoding.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("web message text=" + webText + " Charset code error");
}
TextWebSocketFrame resFrame = new TextWebSocketFrame(textWsFrame.text());
ctx.channel().writeAndFlush(resFrame);
executor.execute("WS-ACTION", command);
} else if (wsFrame instanceof PingWebSocketFrame) {
// ping/pong
ctx.channel().writeAndFlush(new PongWebSocketFrame(wsFrame.content().retain()));
logger.info("ws client siteUserId={} ping to server", command.getSiteUserId());
} else if (wsFrame instanceof CloseWebSocketFrame) {
// close channel
wsHandshaker.close(ctx.channel(), (CloseWebSocketFrame) wsFrame.retain());
WebChannelManager.delChannelSession(command.getSiteUserId());
}
}
private void doWSRequest(ChannelHandlerContext ctx, WebSocketFrame wsFrame) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress();
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
Command command = new Command();
command.setSiteUserId(channelSession.getUserId());
command.setClientIp(clientIp);
command.setStartTime(System.currentTimeMillis());
if (wsFrame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWsFrame = (TextWebSocketFrame) wsFrame;
String webText = textWsFrame.text();
try {
command.setParams(webText.getBytes(CharsetCoding.UTF_8));
} catch (UnsupportedEncodingException e) {
logger.error("web message text=" + webText + " Charset code error");
}
TextWebSocketFrame resFrame = new TextWebSocketFrame(textWsFrame.text());
ctx.channel().writeAndFlush(resFrame);
executor.execute("WS-ACTION", command);
} else if (wsFrame instanceof PingWebSocketFrame) {
// ping/pong
ctx.channel().writeAndFlush(new PongWebSocketFrame(wsFrame.content().retain()));
logger.info("ws client siteUserId={} ping to server", command.getSiteUserId());
} else if (wsFrame instanceof CloseWebSocketFrame) {
// close channel
wsHandshaker.close(ctx.channel(), (CloseWebSocketFrame) wsFrame.retain());
WebChannelManager.delChannelSession(command.getSiteUserId());
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.info("Received msg: {}", msg);
if (!this.handshaker.isHandshakeComplete()) {
this.handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
LOG.info("Client connected.");
this.connected = true;
this.handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
throw new IllegalStateException("Unexpected response: " + msg.toString());
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
synchronized (responses) {
responses.add(((TextWebSocketFrame) frame).text().getBytes(StandardCharsets.UTF_8));
}
} else if (frame instanceof BinaryWebSocketFrame) {
ByteBuf buf = frame.content();
byte[] b = new byte[buf.readableBytes()];
buf.readBytes(b);
synchronized (responses) {
responses.add(b);
}
} else if (frame instanceof PingWebSocketFrame) {
LOG.info("Returning pong message");
ctx.writeAndFlush(new PongWebSocketFrame());
} else if (frame instanceof CloseWebSocketFrame) {
LOG.info("Received message from server to close the channel.");
ctx.close();
} else {
LOG.warn("Unhandled frame type received: " + frame.getClass());
}
}