下面列出了 io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler #io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new WebSocketServerProtocolHandler(WEB_SOCKET_PATH, null, true));
pipeline.addLast(new WebSocketHandler(litchi));
for (ChannelHandler handler : handlers) {
pipeline.addLast(handler);
}
}
private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, Config conf) {
LOG.debug("Configuring Websocket MQTT transport");
if(!conf.websocketEnabled) {
return;
}
int port = conf.websocketPort;// Integer.parseInt(webSocketPortProp);
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
String host = conf.websocketHost;
initFactory(host, port, "Websocket MQTT", new PipelineInitializer() {
@Override
void init(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("webSocketHandler",
new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
configureMQTTPipeline(pipeline, timeoutHandler, handler);
}
});
}
public void addHandlers(final Channel ch) {
ch.pipeline().addBefore(AbstractChannelInitializer.FIRST_ABSTRACT_HANDLER, HTTP_SERVER_CODEC, new HttpServerCodec());
ch.pipeline().addAfter(HTTP_SERVER_CODEC, HTTP_OBJECT_AGGREGATOR, new HttpObjectAggregator(WEBSOCKET_MAX_CONTENT_LENGTH));
final String webSocketPath = websocketListener.getPath();
final String subprotocols = getSubprotocolString();
final boolean allowExtensions = websocketListener.getAllowExtensions();
ch.pipeline().addAfter(HTTP_OBJECT_AGGREGATOR, WEBSOCKET_SERVER_PROTOCOL_HANDLER, new WebSocketServerProtocolHandler(webSocketPath, subprotocols, allowExtensions, Integer.MAX_VALUE));
ch.pipeline().addAfter(WEBSOCKET_SERVER_PROTOCOL_HANDLER, WEBSOCKET_BINARY_FRAME_HANDLER, new WebSocketBinaryFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_BINARY_FRAME_HANDLER, WEBSOCKET_CONTINUATION_FRAME_HANDLER, new WebSocketContinuationFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_BINARY_FRAME_HANDLER, WEBSOCKET_TEXT_FRAME_HANDLER, new WebSocketTextFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_TEXT_FRAME_HANDLER, MQTT_WEBSOCKET_ENCODER, new MQTTWebsocketEncoder());
}
private boolean upgradeToWebSocket(ChannelHandlerContext ctx, FullHttpRequest request) {
HttpHeaders headers = request.headers();
if ("Upgrade".equalsIgnoreCase(headers.get(HttpHeaderNames.CONNECTION)) &&
"WebSocket".equalsIgnoreCase(headers.get(HttpHeaderNames.UPGRADE))) {
ChannelPipeline pipeline = ctx.pipeline();
// 将http升级为WebSocket
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(FrameCodec.getInstance());
pipeline.addLast(FrameHandler.getInstance(channelListener));
pipeline.remove(this);
// 将channelActive事件传递到FrameHandler
ctx.fireChannelActive();
return true;
}
return false;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//添加闲置处理,60秒没有数据传输,触发事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
//将字节解码为HttpMessage对象,并将HttpMessage对象编码为字节
pipeline.addLast(new HttpServerCodec());
//出站数据压缩
pipeline.addLast(new HttpContentCompressor());
//聚合多个HttpMessage为单个FullHttpRequest
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//如果被请求的端点是/ws,则处理该升级握手
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//聊天消息处理
pipeline.addLast(new ChatServerHandler());
//心跳处理
pipeline.addLast(new HeartbeatHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
HttpRequestHandler httpRequestHandler = null;
if (httpResourcePath == null) {
httpRequestHandler = new HttpRequestHandler("/ws");
} else {
httpRequestHandler = new HttpRequestHandler("/ws", httpResourcePath);
}
pipeline.addLast(httpRequestHandler);
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class));
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
//删除之前 HttpRequestHandle
ctx.pipeline().remove(HttpRequestHandle.class) ;
//通知所有已经连接的客户端 有新的连接来了
group.writeAndFlush(new TextWebSocketFrame("新的客户端=" + ctx.channel() + "连接上来了")) ;
//将当前的 channel 也就是 websocket channel 加入到 channelGroup 当中
group.add(ctx.channel()) ;
}else {
//交给下一个 channelHandler 处理
super.userEventTriggered(ctx, evt);
}
}
@SuppressWarnings("deprecation")
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)
{//旧版本
log.debug("excute webSocketHandComplete……");
webSocketHandComplete(ctx);
ctx.pipeline().remove(this);
log.debug("excuted webSocketHandComplete:"+ctx.pipeline().toMap().toString());
return;
}
if(evt instanceof HandshakeComplete)
{//新版本
HandshakeComplete hc=(HandshakeComplete)evt;
log.debug("excute webSocketHandComplete……,HandshakeComplete="+hc);
webSocketHandComplete(ctx);
ctx.pipeline().remove(this);
log.debug("excuted webSocketHandComplete:"+ctx.pipeline().toMap().toString());
return;
}
super.userEventTriggered(ctx, evt);
}
protected ChannelHandler setupWSChannel(SslContext sslCtx, Configuration conf, DataStore datastore) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("httpServer", new HttpServerCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
ch.pipeline().addLast("sessionExtractor", new WebSocketHttpCookieHandler(config));
ch.pipeline().addLast("idle-handler", new IdleStateHandler(conf.getWebsocket().getTimeout(), 0, 0));
ch.pipeline().addLast("ws-protocol", new WebSocketServerProtocolHandler(WS_PATH, null, true));
ch.pipeline().addLast("wsDecoder", new WebSocketRequestDecoder(datastore, config));
ch.pipeline().addLast("error", new WSExceptionHandler());
}
};
}
@Override
protected void initChannel( SocketChannel _channel ) throws Exception {
ChannelPipeline pipeline = _channel.pipeline();
// 是否使用客户端模式
if( CommonConfig.getBoolean( "jees.jsts.websocket.ssl.enable", false ) ){
SSLEngine engine = sslContext1.createSSLEngine();
// 是否需要验证客户端
engine.setUseClientMode(false);
// engine.setNeedClientAuth(false);
pipeline.addFirst("ssl", new SslHandler( engine ));
}
pipeline.addLast( new IdleStateHandler( 100 , 0 , 0 , TimeUnit.SECONDS ) );
pipeline.addLast( new HttpServerCodec() );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new HttpObjectAggregator( 8192 ) );
pipeline.addLast( new WebSocketServerProtocolHandler( CommonConfig.getString( ISocketBase.Netty_WebSocket_Url, "/" ) ) );
pipeline.addLast( CommonContextHolder.getBean( WebSocketHandler.class ) );
}
protected ChannelHandler setupWSChannel(BalancerConfiguration balancerConfig, SslContext sslCtx,
MetricResolver metricResolver, WsClientPool wsClientPool) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("httpServer", new HttpServerCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("sessionExtractor", new WebSocketFullRequestHandler());
ch.pipeline().addLast("idle-handler",
new IdleStateHandler(balancerConfig.getWebsocket().getTimeout(), 0, 0));
ch.pipeline().addLast("ws-protocol",
new WebSocketServerProtocolHandler(WS_PATH, null, true, 65536, false, true));
ch.pipeline().addLast("wsDecoder", new WebSocketRequestDecoder(balancerConfig.getSecurity()));
ch.pipeline().addLast("httpRelay", new WsRelayHandler(balancerConfig, metricResolver, wsClientPool));
ch.pipeline().addLast("error", new WSTimelyExceptionHandler());
}
};
}
@Override
protected void initChannel(Channel ch) {
//添加web socket相关内容
ChannelPipeline pip = ch.pipeline();
if (sslCtx != null) {
pip.addLast("sslHandler", sslCtx.newHandler(ch.alloc()));
}
pip.addLast(new HttpServerCodec());
pip.addLast(new HttpObjectAggregator(65536));
pip.addLast(new WebSocketServerProtocolHandler("/"));
pip.addLast(new WebSocketDecoder());
pip.addLast(new WebSocketEncoder());
pip.addLast(new MessageDecoder(builder.getImessageandhandler()));
pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
for (ChannelHandler handler : builder.getExtraHandlers()) {
pip.addLast(handler);
}
}
protected ChannelInitializer<Channel> createInitializer() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec() );
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(64 * 1024));
p.addLast(new EchoServerHttpRequestHandler("/ws"));
p.addLast(new WebSocketServerProtocolHandler("/ws"));
p.addLast(new EchoServerWSHandler());
}
};
}
public ChannelFuture run() {
final ServerBootstrap httpServerBootstrap = new ServerBootstrap();
httpServerBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpResponseEncoder(),
new HttpRequestDecoder(),
new HttpObjectAggregator(65536),
new WebSocketServerProtocolHandler("/debug-session"),
new DebugProtocolHandler(debugWebsocketConfiguration));
}
});
LOGG.log(Level.INFO, "starting camunda BPM debug HTTP websocket interface on port "+port+".");
return httpServerBootstrap.bind(port);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("timeout", new ReadTimeoutHandler(15));
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new HTTPHandler(plugin));
pipeline.addLast("websocket", new WebSocketServerProtocolHandler("/server"));
pipeline.addLast("packet-decoder", new PacketDecoder());
pipeline.addLast("packet-encoder", new PacketEncoder());
pipeline.addLast("packet-handler", new ClientHandler(socketChannel, plugin));
socketChannel.config().setAllocator(PooledByteBufAllocator.DEFAULT);
plugin.getWebHandler().getChannelGroup().add(socketChannel);
}
@Override
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap()
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATER_MARK)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATER_MARK)
.channel(NioServerSocketChannel.class)
.group(BOSS, WORKER)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pip = ch.pipeline();
pip.addLast(new IdleStateHandler(0, 0, 30 * 60 * 1000))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new WebSocketServerProtocolHandler("/ws"))
.addLast(new WebSocketFrameAggregator(1024 * 1024 * 1024))
.addLast(new RequestDecoder(new DefaultRequestEncryption(new RSAEncryption())))
.addLast(new WebSocketEncoder())
.addLast(new TabHandler())
.addLast(new HostsValidatorHandler(serverFinder))
.addLast(new UiRequestHandler(
commandStore,
uiConnectionStore,
agentConnectionStore,
sessionManager));
}
});
try {
this.channel = bootstrap.bind(port).sync().channel();
logger.info("client server startup successfully, port {}", port);
} catch (Exception e) {
logger.error("netty server for ui start fail", e);
throw Throwables.propagate(e);
}
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, subProtocols, true));
pipeline.addLast(new WebSocketRemoteServerFrameHandler());
}
private void appendWebsocketCodec(ChannelPipeline pipeline) {
// websocket 解码流程
// websocket协议处理器(握手、心跳等)
pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath));
pipeline.addLast(new BinaryWebSocketFrameToBytesDecoder());
// websocket 编码流程
// Web socket clients must set this to true to mask payload.
// Server implementations must set this to false.
pipeline.addLast(new WebSocket13FrameEncoder(false));
// 将ByteBuf转换为websocket二进制帧
pipeline.addLast(new BytesToBinaryWebSocketFrameEncoder());
}
private void initializeWSSTransport(NewNettyMQTTHandler handler, Config conf, SslContext sslContext) {
LOG.debug("Configuring secure websocket MQTT transport");
if(!conf.wssEnabled) {
return;
}
int sslPort = conf.wssPort;
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
String host = conf.wssHost;
final boolean needsClientAuth = conf.certClientAuth;
initFactory(host, sslPort, "Secure websocket", new PipelineInitializer() {
@Override
void init(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("ssl", createSslHandler(channel, sslContext, needsClientAuth));
pipeline.addLast("httpEncoder", new HttpResponseEncoder());
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("webSocketHandler",
new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
configureMQTTPipeline(pipeline, timeoutHandler, handler);
}
});
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpServerCodec", new HttpServerCodec());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpObjectAggregator", new HttpObjectAggregator(65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "ChunkedWriteHandler", new ChunkedWriteHandler());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "compressor ", new HttpContentCompressor());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "protocol", new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "mqttWebSocket", new MqttWebSocketCodec());
HttpResourceHander httpResourceHander = new HttpResourceHander(httpResource);
ch.pipeline().addLast("httpResource", httpResourceHander);
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));
pipeline.addLast(new WebSocketFrameHandler());
}
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST))
.addLast(new WebSocketFrameToByteBufDecoder())
.addLast(new ByteBufToWebSocketFrameEncoder())
.addLast(new MqttDecoder(mqttContext.getMqttConfig().getMaxPayloadSize()))
.addLast(MqttEncoder.INSTANCE)
.addLast(new ConnectionHandler())
.addLast(newMqttCommandInvocation());
}
public LauncherNettyServer(LaunchServer server) {
LaunchServerConfig.NettyConfig config = server.config.netty;
NettyObjectFactory.setUsingEpoll(config.performance.usingEpoll);
if (config.performance.usingEpoll) {
LogHelper.debug("Netty: Epoll enabled");
}
if (config.performance.usingEpoll && !Epoll.isAvailable()) {
LogHelper.error("Epoll is not available: (netty,perfomance.usingEpoll configured wrongly)", Epoll.unavailabilityCause());
}
bossGroup = NettyObjectFactory.newEventLoopGroup(config.performance.bossThread);
workerGroup = NettyObjectFactory.newEventLoopGroup(config.performance.workerThread);
serverBootstrap = new ServerBootstrap();
service = new WebSocketService(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE), server);
serverBootstrap.group(bossGroup, workerGroup)
.channelFactory(NettyObjectFactory.getServerSocketChannelFactory())
.handler(new LoggingHandler(config.logLevel))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
NettyConnectContext context = new NettyConnectContext();
//p.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("http-codec-compressor", new HttpObjectAggregator(65536));
if (server.config.netty.ipForwarding)
pipeline.addLast("forward-http", new NettyIpForwardHandler(context));
pipeline.addLast("websock-comp", new WebSocketServerCompressionHandler());
pipeline.addLast("websock-codec", new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
if (server.config.netty.fileServerEnabled)
pipeline.addLast("fileserver", new FileServerHandler(server.updatesDir, true, config.showHiddenFiles));
pipeline.addLast("launchserver", new WebSocketFrameHandler(context, server, service));
pipelineHook.hook(context, ch);
}
});
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(DFWSRequestHandler.class); //remove http handle
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* 设置所有新注册的 Channel 的ChannelPipeline
* 安装所有需要的 ChannelHandler
*/
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
@Override
protected void initChannel(SocketChannel sc) {
ChannelPipeline pipeline = sc.pipeline();
ServerConfig config = serverContext.getConfig();
if (ServerConst.SERVER_PROTOCOL_STRING_LINE.equals(config.getProtocol())) {
pipeline.addLast(new StringPasswordLineDecoder(config.getMsgLength(), config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordLineEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_WEB_SOCKET.equals(config.getProtocol())) {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketUriFilter(serverContext));
pipeline.addLast(new WebSocketServerProtocolHandler("/" + config.getServerName()));
pipeline.addLast(new WebSocketDecoder());
pipeline.addLast(new WebSocketEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_LENGTH_FIELD.equals(config.getProtocol())) {
pipeline.addLast(new LengthFieldBasedFrameDecoder(config.getMsgLength(), 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringPasswordDecoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new StringPasswordEncoder(config.getCharset(), config.getPassword()));
pipeline.addLast(new ServerStringHandler(serverContext));
} else if (ServerConst.SERVER_PROTOCOL_HTTP.equals(config.getProtocol())) {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(config.getMsgLength()));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebRequestHandler(serverContext));
} else {
throw new NotFoundProtocolException(config.getProtocol());
}
}
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
//webSocket协议本身是基于http协议的,所以这边也要使用http解编码器
pipeline.addLast(new HttpServerCodec());
//以块的方式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
//netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(104 * 1024 * 1024));
//参数指的是contex_path
pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, 104 * 1024 * 1024));
//webSocket定义了传递数据的6中frame类型
pipeline.addLast(new ServerHandler());
}
private void intProtocolHandler(ChannelPipeline channelPipeline,InitNetty serverBean){
channelPipeline.addLast(BootstrapConstant.HTTP_CODE,new HttpServerCodec());
// channelPipeline.addLast("http-decoder",new HttpRequestDecoder());
channelPipeline.addLast(BootstrapConstant.AGGREGATOR, new HttpObjectAggregator(serverBean.getMaxContext()));
// channelPipeline.addLast("http-encoder",new HttpResponseEncoder());
channelPipeline.addLast(BootstrapConstant.CHUNKED_WRITE,new ChunkedWriteHandler());
channelPipeline.addLast(BootstrapConstant.WEB_SOCKET_HANDLER,new WebSocketServerProtocolHandler(serverBean.getWebSocketPath()));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true, 65536, false, true, 10000L));
pipeline.addLast(new TunnelSocketFrameHandler(tunnelServer));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
if (in.readableBytes() < 3) {
return;
}
if (detectTelnetFuture != null && detectTelnetFuture.isCancellable()) {
detectTelnetFuture.cancel(false);
}
byte[] bytes = new byte[3];
in.getBytes(0, bytes);
String httpHeader = new String(bytes);
ChannelPipeline pipeline = ctx.pipeline();
if (!"GET".equalsIgnoreCase(httpHeader)) { // telnet
channelGroup.add(ctx.channel());
TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory);
pipeline.addLast(handler);
ctx.fireChannelActive(); // trigger TelnetChannelHandler init
} else {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File("arthas-output")));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(channelGroup, ttyConnectionFactory));
ctx.fireChannelActive();
}
pipeline.remove(this);
ctx.fireChannelRead(in);
}