类 io.netty.handler.codec.http.websocketx.CloseWebSocketFrame 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.http.websocketx.CloseWebSocketFrame 的API类实例代码及写法,或者点击链接到github查看源代码。


/**
 * Print {@link WebSocketFrame} information.
 *
 * @param log              {@link Logger} object of the relevant class
 * @param frame            {@link WebSocketFrame} frame
 * @param channelContextId {@link ChannelHandlerContext} context id as a String
 * @param customMsg        Log message which needs to be appended to the frame information,
 *                         if it is not required provide null
 * @param isInbound        true if the frame is inbound, false if it is outbound
 */
private static void printWebSocketFrame(Logger log, WebSocketFrame frame, String channelContextId,
                                        String customMsg, boolean isInbound) {

    String logStatement = getDirectionString(isInbound) + channelContextId;
    if (frame instanceof PingWebSocketFrame) {
        logStatement += " Ping frame";
    } else if (frame instanceof PongWebSocketFrame) {
        logStatement += " Pong frame";
    } else if (frame instanceof CloseWebSocketFrame) {
        logStatement += " Close frame";
    } else if (frame instanceof BinaryWebSocketFrame) {
        logStatement += " Binary frame";
    } else if (frame instanceof TextWebSocketFrame) {
        logStatement += " " + ((TextWebSocketFrame) frame).text();
    }

    //specifically for logging close websocket frames with error status
    if (customMsg != null) {
        logStatement += " " + customMsg;
    }
    log.debug(logStatement);

}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
                    .getName()));
        }

        // Send the uppercase string back.
        String request = ((TextWebSocketFrame) frame).text();
        System.err.printf("%s received %s%n", ctx.channel(), request);
        ctx.channel().write(new TextWebSocketFrame(request.toUpperCase()));
    }
 
源代码3 项目: micro-integrator   文件: LogUtil.java

/**
 * Print {@link WebSocketFrame} information.
 *
 * @param log              {@link Log} object of the relevant class
 * @param frame            {@link WebSocketFrame} frame
 * @param channelContextId {@link ChannelHandlerContext} context id as a String
 * @param customMsg        Log message which needs to be appended to the frame information,
 *                         if it is not required provide null
 * @param isInbound        true if the frame is inbound, false if it is outbound
 */
private static void printWebSocketFrame(
        Log log, WebSocketFrame frame, String channelContextId,
        String customMsg, boolean isInbound) {

    String logStatement = getDirectionString(isInbound) + channelContextId;
    if (frame instanceof PingWebSocketFrame) {
        logStatement += " Ping frame";
    } else if (frame instanceof PongWebSocketFrame) {
        logStatement += " Pong frame";
    } else if (frame instanceof CloseWebSocketFrame) {
        logStatement += " Close frame";
    } else if (frame instanceof BinaryWebSocketFrame) {
        logStatement += " Binary frame";
    } else if (frame instanceof TextWebSocketFrame) {
        logStatement += " " + ((TextWebSocketFrame) frame).text();
    }

    //specifically for logging close websocket frames with error status
    if (customMsg != null) {
        logStatement += " " + customMsg;
    }
    log.debug(logStatement);

}
 
源代码4 项目: quarkus-http   文件: FrameHandler.java

void onCloseFrame(final CloseWebSocketFrame message) {
    if (session.isSessionClosed()) {
        //we have already handled this when we sent the close frame
        return;
    }
    String reason = message.reasonText();
    int code = message.statusCode() == -1 ? CloseReason.CloseCodes.NORMAL_CLOSURE.getCode() : message.statusCode();

    session.getContainer().invokeEndpointMethod(executor, new Runnable() {
        @Override
        public void run() {
            try {
                session.closeInternal(new CloseReason(CloseReason.CloseCodes.getCloseCode(code), reason));
            } catch (IOException e) {
                invokeOnError(e);
            }
        }
    });
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
	if (frame instanceof CloseWebSocketFrame) {
		handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
		return;
	}
	if (frame instanceof PingWebSocketFrame) {
		ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
		return;
	}
	if (frame instanceof TextWebSocketFrame) {
		if (webSocketEvent != null) {
			webSocketEvent.onMessageStringEvent(baseServer, new WebSocketSession(ctx.channel()), ((TextWebSocketFrame) frame).text());
		}
		return;
	}
	
	if (frame instanceof BinaryWebSocketFrame) {
		if (webSocketEvent != null) {
			webSocketEvent.onMessageBinaryEvent(baseServer, new WebSocketSession(ctx.channel()), ((BinaryWebSocketFrame)frame).content());
		}
	}
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
    }
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }
 

/**
 * 处理WebSocket请求
 * 
 * @param ctx
 * @param frame
 */
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    if (frame instanceof CloseWebSocketFrame) {
        handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
        ctx.close();
        return;
    }
    // 没有使用WebSocketServerProtocolHandler,所以不会接收到PingWebSocketFrame。
    // if (frame instanceof PingWebSocketFrame) {
    // ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
    // return;
    // }
    if (!(frame instanceof TextWebSocketFrame)) {
        throw new UnsupportedOperationException(
                String.format("%s frame types not supported", frame.getClass().getName()));
    }

    String request = ((TextWebSocketFrame) frame).text();
    logger.debug("收到客户端发送的数据:" + request);
    // 回复心跳
    if (request.length() == 0) {
        ctx.writeAndFlush(new TextWebSocketFrame(""));
        return;
    }
    this.handleMessage(ctx.channel(), request);
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    if (logger.isLoggable(Level.FINE)) {
        logger.fine(String.format(
                "Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame)));
    }

    if (frame instanceof CloseWebSocketFrame) {
        handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
    } else if (frame instanceof PingWebSocketFrame) {
        ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise());
    } else if (frame instanceof TextWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof BinaryWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof ContinuationWebSocketFrame) {
        ctx.write(frame, ctx.voidPromise());
    } else if (frame instanceof PongWebSocketFrame) {
        frame.release();
        // Ignore
    } else {
        throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
                .getName()));
    }
}
 

@Override
public void channelRead0(ChannelHandlerContext context, Object message) {
  Channel channel = context.channel();
  if (message instanceof FullHttpResponse) {
    checkState(!handshaker.isHandshakeComplete());
    try {
      handshaker.finishHandshake(channel, (FullHttpResponse) message);
      delegate.onOpen();
    } catch (WebSocketHandshakeException e) {
      delegate.onError(e);
    }
  } else if (message instanceof TextWebSocketFrame) {
    delegate.onMessage(((TextWebSocketFrame) message).text());
  } else {
    checkState(message instanceof CloseWebSocketFrame);
    delegate.onClose();
  }
}
 

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check for closing frame
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
            return;
        }
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }
 
源代码12 项目: socketio   文件: WebSocketHandler.java

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
  if (log.isDebugEnabled())
    log.debug("Received {} WebSocketFrame: {} from channel: {}", getTransportType().getName(), msg, ctx.channel());

  if (msg instanceof CloseWebSocketFrame) {
    sessionIdByChannel.remove(ctx.channel());
    ChannelFuture f = ctx.writeAndFlush(msg);
    f.addListener(ChannelFutureListener.CLOSE);
  } else if (msg instanceof PingWebSocketFrame) {
    ctx.writeAndFlush(new PongWebSocketFrame(msg.content()));
  } else if (msg instanceof TextWebSocketFrame || msg instanceof BinaryWebSocketFrame){
    Packet packet = PacketDecoder.decodePacket(msg.content());
    packet.setTransportType(getTransportType());
    String sessionId = sessionIdByChannel.get(ctx.channel());
    packet.setSessionId(sessionId);
    msg.release();
    ctx.fireChannelRead(packet);
  } else {
    msg.release();
    log.warn("{} frame type is not supported", msg.getClass().getName());
  }
}
 

@Test
public void testCreateSubscriptionWithMissingSessionId() throws Exception {
    decoder = new WebSocketRequestDecoder(config.getSecurity());
// @formatter:off
    String request = "{ "+
      "\"operation\" : \"create\", " +
      "\"subscriptionId\" : \"1234\"" +
    " }";
    // @formatter:on
    TextWebSocketFrame frame = new TextWebSocketFrame();
    frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
    decoder.decode(ctx, frame, results);
    Assert.assertNotNull(ctx.msg);
    Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass());
    Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode());
    Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText());
}
 

@Test
public void testCreateSubscriptionWithInvalidSessionIdAndNonAnonymousAccess() throws Exception {
    ctx.channel().attr(SubscriptionRegistry.SESSION_ID_ATTR)
            .set(URLEncoder.encode(UUID.randomUUID().toString(), StandardCharsets.UTF_8.name()));
    decoder = new WebSocketRequestDecoder(config.getSecurity());
// @formatter:off
    String request = "{ "+
      "\"operation\" : \"create\", " +
      "\"subscriptionId\" : \"1234\"" +
    " }";
    // @formatter:on
    TextWebSocketFrame frame = new TextWebSocketFrame();
    frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8));
    decoder.decode(ctx, frame, results);
    Assert.assertNotNull(ctx.msg);
    Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass());
    Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode());
    Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText());
}
 
源代码15 项目: zbus-server   文件: HttpWsServer.java

private byte[] decodeWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
	// Check for closing frame
	if (frame instanceof CloseWebSocketFrame) {
		handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
		return null;
	}
	
	if (frame instanceof PingWebSocketFrame) {
		ctx.write(new PongWebSocketFrame(frame.content().retain()));
		return null;
	}
	
	if (frame instanceof TextWebSocketFrame) {
		TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
		return parseMessage(textFrame.content());
	}
	
	if (frame instanceof BinaryWebSocketFrame) {
		BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame;
		return parseMessage(binFrame.content());
	}
	
	logger.warn("Message format error: " + frame); 
	return null;
}
 

Mono<Void> sendClose(CloseWebSocketFrame frame) {
	if (CLOSE_SENT.get(this) == 0) {
		//commented for now as we assume the close is always scheduled (deferFuture runs)
		//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
		return FutureMono.deferFuture(() -> {
			if (CLOSE_SENT.getAndSet(this, 1) == 0) {
				discard();
				onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
				return channel().writeAndFlush(frame)
				                .addListener(ChannelFutureListener.CLOSE);
			}
			frame.release();
			return channel().newSucceededFuture();
		}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
	}
	frame.release();
	return Mono.empty();
}
 

@SuppressWarnings("FutureReturnValueIgnored")
void sendCloseNow(@Nullable CloseWebSocketFrame frame) {
	if (frame != null && !frame.isFinalFragment()) {
		//"FutureReturnValueIgnored" this is deliberate
		channel().writeAndFlush(frame);
		return;
	}
	if (CLOSE_SENT.getAndSet(this, 1) == 0) {
		if (frame != null) {
			onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
			channel().writeAndFlush(frame)
			         .addListener(ChannelFutureListener.CLOSE);
		} else {
			onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
			channel().writeAndFlush(new CloseWebSocketFrame())
			         .addListener(ChannelFutureListener.CLOSE);
		}
	}
	else if (frame != null) {
		frame.release();
	}
}
 

Mono<Void> sendClose(CloseWebSocketFrame frame) {
	if (CLOSE_SENT.get(this) == 0) {
		//commented for now as we assume the close is always scheduled (deferFuture runs)
		//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
		return FutureMono.deferFuture(() -> {
			if (CLOSE_SENT.getAndSet(this, 1) == 0) {
				discard();
				onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
				return channel().writeAndFlush(frame)
				                .addListener(ChannelFutureListener.CLOSE);
			}
			frame.release();
			return channel().newSucceededFuture();
		}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
	}
	frame.release();
	return Mono.empty();
}
 

@SuppressWarnings("FutureReturnValueIgnored")
void sendCloseNow(@Nullable CloseWebSocketFrame frame, ChannelFutureListener listener) {
	if (frame != null && !frame.isFinalFragment()) {
		//"FutureReturnValueIgnored" this is deliberate
		channel().writeAndFlush(frame);
		return;
	}
	if (CLOSE_SENT.getAndSet(this, 1) == 0) {
		if (frame != null) {
			onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
			channel().writeAndFlush(frame)
			         .addListener(listener);
		} else {
			onCloseState.onNext(new WebSocketCloseStatus(-1, ""));
			channel().writeAndFlush(new CloseWebSocketFrame())
			         .addListener(listener);
		}
	}
	else if (frame != null) {
		frame.release();
	}
}
 
源代码20 项目: selenium   文件: WebSocketUpgradeHandler.java

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
  if (frame instanceof CloseWebSocketFrame) {
    handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
    // Pass on to the rest of the channel
    ctx.fireChannelRead(frame);
  } else if (frame instanceof PingWebSocketFrame) {
    ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
  } else if (frame instanceof ContinuationWebSocketFrame) {
    ctx.write(frame);
  } else if (frame instanceof PongWebSocketFrame) {
    frame.release();
  } else if (frame instanceof BinaryWebSocketFrame || frame instanceof TextWebSocketFrame) {
    // Allow the rest of the pipeline to deal with this.
    ctx.fireChannelRead(frame);
  } else {
    throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
      .getName()));
  }
}
 
源代码21 项目: reactor-netty   文件: WebsocketTest.java

@Test
public void testCloseWebSocketFrameSentByClient() {
	httpServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, res) ->
			                  res.sendWebsocket((in, out) -> out.sendString(Mono.just("echo"))
			                                                    .sendObject(new CloseWebSocketFrame())))
			          .wiretap(true)
			          .bindNow();

	Mono<Void> response =
			HttpClient.create()
			          .port(httpServer.port())
			          .websocket()
			          .uri("/")
			          .handle((in, out) -> out.sendObject(in.receiveFrames()
			                                                .doOnNext(WebSocketFrame::retain)
			                                                .then()))
			          .next();

	StepVerifier.create(response)
	            .expectComplete()
	            .verify(Duration.ofSeconds(30));
}
 
源代码22 项目: reactor-netty   文件: WebsocketTest.java

@Test
public void testIssue444() {
	doTestIssue444((in, out) ->
			out.sendObject(Flux.error(new Throwable())
			                   .onErrorResume(ex -> out.sendClose(1001, "Going Away"))
			                   .cast(WebSocketFrame.class)));
	doTestIssue444((in, out) ->
			out.send(Flux.range(0, 10)
			             .map(i -> {
			                 if (i == 5) {
			                     out.sendClose(1001, "Going Away").subscribe();
			                 }
			                 return Unpooled.copiedBuffer((i + "").getBytes(Charset.defaultCharset()));
			             })));
	doTestIssue444((in, out) ->
			out.sendObject(Flux.error(new Throwable())
			                   .onErrorResume(ex -> Flux.empty())
			                   .cast(WebSocketFrame.class))
			   .then(Mono.defer(() -> out.sendObject(
			       new CloseWebSocketFrame(1001, "Going Away")).then())));
}
 
源代码23 项目: wind-im   文件: WsServerHandler.java

private void doWSRequest(ChannelHandlerContext ctx, WebSocketFrame wsFrame) {
	InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
	String clientIp = socketAddress.getAddress().getHostAddress();
	ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();

	Command command = new Command();
	command.setSiteUserId(channelSession.getUserId());
	command.setClientIp(clientIp);
	command.setStartTime(System.currentTimeMillis());

	if (wsFrame instanceof TextWebSocketFrame) {
		TextWebSocketFrame textWsFrame = (TextWebSocketFrame) wsFrame;
		String webText = textWsFrame.text();
		try {
			command.setParams(webText.getBytes(CharsetCoding.UTF_8));
		} catch (UnsupportedEncodingException e) {
			logger.error("web message text=" + webText + " Charset code error");
		}
		TextWebSocketFrame resFrame = new TextWebSocketFrame(textWsFrame.text());
		ctx.channel().writeAndFlush(resFrame);

		executor.execute("WS-ACTION", command);
	} else if (wsFrame instanceof PingWebSocketFrame) {
		// ping/pong
		ctx.channel().writeAndFlush(new PongWebSocketFrame(wsFrame.content().retain()));
		logger.info("ws client siteUserId={} ping to server", command.getSiteUserId());
	} else if (wsFrame instanceof CloseWebSocketFrame) {
		// close channel
		wsHandshaker.close(ctx.channel(), (CloseWebSocketFrame) wsFrame.retain());
		WebChannelManager.delChannelSession(command.getSiteUserId());
	}
}
 
源代码24 项目: selenium   文件: MessageOutboundConverter.java

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
  throws Exception {
  if (!(msg instanceof Message)) {
    super.write(ctx, msg, promise);
    return;
  }

  Message seMessage = (Message) msg;

  if (seMessage instanceof CloseMessage) {
    ctx.writeAndFlush(new CloseWebSocketFrame(true, 0));
  } else if (seMessage instanceof BinaryMessage) {
    ctx.writeAndFlush(
      new BinaryWebSocketFrame(
        true,
        0,
        Unpooled.copiedBuffer(((BinaryMessage) seMessage).data())));
  } else if (seMessage instanceof TextMessage) {
    ctx.writeAndFlush(
      new TextWebSocketFrame(
        true,
        0,
        ((TextMessage) seMessage).text()));
  } else {
    LOG.warning(String.format("Unable to handle %s", msg));
    super.write(ctx, msg, promise);
  }
}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
    if (frame instanceof TextWebSocketFrame) {
        // Echos the same text
        String text = ((TextWebSocketFrame) frame).text();
        ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
    } else if (frame instanceof BinaryWebSocketFrame) {
        ctx.channel().writeAndFlush(frame.retain());
    } else if (frame instanceof CloseWebSocketFrame) {
        ctx.close();
    } else {
        String message = "unsupported frame type: " + frame.getClass().getName();
        throw new UnsupportedOperationException(message);
    }
}
 

public void addCloseListener(final ChannelHandlerContext targetCtx) {
    ChannelFuture closeFuture = ctx.channel().closeFuture();
    closeFuture.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                if (targetCtx.channel().isActive()) {
                    targetCtx.channel().write(new CloseWebSocketFrame()).addListener(ChannelFutureListener.CLOSE);
                }
            }
        }
    });
}
 

public void handleClientWebsocketChannelTermination(WebSocketFrame frame) throws AxisFault {

        handshaker.close(wrappedContext.getChannelHandlerContext().channel(), (CloseWebSocketFrame) frame.retain());
        String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
        WebsocketSubscriberPathManager.getInstance()
                .removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);

    }
 

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel ch = ctx.channel();
    if (!handshaker.isHandshakeComplete()) {
        handshaker.finishHandshake(ch, (FullHttpResponse) msg);
        System.out.println("WebSocket Client connected!");
        handshakeFuture.setSuccess();
        return;
    }

    if (msg instanceof FullHttpResponse) {
        FullHttpResponse response = (FullHttpResponse) msg;
        throw new IllegalStateException(
                "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
                        ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    WebSocketFrame frame = (WebSocketFrame) msg;
    if (frame instanceof TextWebSocketFrame) {
        TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
        System.out.println("WebSocket Client received message: " + textFrame.text());
    } else if (frame instanceof PongWebSocketFrame) {
        System.out.println("WebSocket Client received pong");
    } else if (frame instanceof CloseWebSocketFrame) {
        System.out.println("WebSocket Client received closing");
        ch.close();
    }
}
 

@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final CloseWebSocketFrame frame, final List<Object> objects) throws Exception {
    final ByteBuf messageBytes = frame.content();
    final byte len = messageBytes.readByte();
    if (len <= 0) {
        objects.add(RequestMessage.INVALID);
        return;
    }

    final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(len);
    try {
        messageBytes.readBytes(contentTypeBytes);
        final String contentType = contentTypeBytes.toString(UTF8);
        final MessageSerializer serializer = select(contentType, ServerSerializers.DEFAULT_BINARY_SERIALIZER);

        // it's important to re-initialize these channel attributes as they apply globally to the channel. in
        // other words, the next request to this channel might not come with the same configuration and mixed
        // state can carry through from one request to the next
        channelHandlerContext.channel().attr(StateKey.SESSION).set(null);
        channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
        channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);

        try {
            objects.add(serializer.deserializeRequest(messageBytes.discardReadBytes()));
        } catch (SerializationException se) {
            objects.add(RequestMessage.INVALID);
        }
    } finally {
        contentTypeBytes.release();
    }
}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
    if (frame instanceof TextWebSocketFrame) {
        // Echos the same text
        String text = ((TextWebSocketFrame) frame).text();
        ctx.channel().writeAndFlush(new TextWebSocketFrame(text));
    } else if (frame instanceof BinaryWebSocketFrame) {
        ctx.channel().writeAndFlush(frame.retain());
    } else if (frame instanceof CloseWebSocketFrame) {
        ctx.close();
    } else {
        String message = "unsupported frame type: " + frame.getClass().getName();
        throw new UnsupportedOperationException(message);
    }
}
 
 类方法
 同包方法