下面列出了 io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler #io.netty.handler.stream.ChunkedWriteHandler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected ChannelInitializer<Channel> createInitializer() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec() );
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(64 * 1024));
p.addLast(new EchoServerHttpRequestHandler("/ws"));
p.addLast(new WebSocketServerProtocolHandler("/ws"));
p.addLast(new EchoServerWSHandler());
}
};
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
PREVIEW_STARTED.inc();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(inboundIpTracking);
if (serverTlsContext != null && serverTlsContext.useTls()) {
SSLEngine engine = serverTlsContext.getContext().newEngine(ch.alloc());
engine.setNeedClientAuth(serverConfig.isTlsNeedClientAuth());
engine.setUseClientMode(false);
pipeline.addLast(FILTER_SSL, new SslHandler(engine));
}
pipeline.addLast(FILTER_CODEC, new HttpServerCodec());
pipeline.addLast(FILTER_HTTP_AGGREGATOR, new HttpObjectAggregator(65536));
pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("bind-client-context", bindClient);
pipeline.addLast(FILTER_HANDLER, handlerProvider.get());
pipeline.addLast(outboundIpTracking);
ch.pipeline().addAfter(FILTER_HTTP_AGGREGATOR, "corshandler", new CorsHandler(corsConfig.build()));
}
@Override
protected void initChannel(Channel ch) throws Exception {
Preconditions.checkNotNull(handler, "Must specify a channel handler");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
if(maxRequestSizeBytes > 0) {
pipeline.addLast(new HttpObjectAggregator(maxRequestSizeBytes));
}
if(chunkedWrites) {
pipeline.addLast(new ChunkedWriteHandler());
}
if(clientFactory != null) {
pipeline.addLast(new BindClientContextHandler(cookieConfig, clientFactory, requestAuthorizer));
}
pipeline.addLast(handler);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SslConfig ssl = nettyConfig.getSsl();
if (ssl.isEnable()) {
initSSL(pipeline, ssl);
}
//超时检测
IdleStateHandler idleStateHandler = new IdleStateHandler(
nettyConfig.getRead(),
nettyConfig.getWrite(),
nettyConfig.getAll(), TimeUnit.MINUTES);
pipeline.addLast(idleStateHandler);
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chucked", new ChunkedWriteHandler());
pipeline.addLast("handler", handler);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
HttpRequestHandler httpRequestHandler = null;
if (httpResourcePath == null) {
httpRequestHandler = new HttpRequestHandler("/ws");
} else {
httpRequestHandler = new HttpRequestHandler("/ws", httpResourcePath);
}
pipeline.addLast(httpRequestHandler);
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/*
* ChannelInboundHandler按照注册的先后顺序执行,ChannelOutboundHandler按照注册的先后顺序逆序执行。
* HttpRequestDecoder、HttpObjectAggregator、HttpHandler为InboundHandler
* HttpContentCompressor、HttpResponseEncoder为OutboundHandler
* 在使用Handler的过程中,需要注意:
* 1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
* 2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
* 3、ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前,否则将无法传递到ChannelOutboundHandler。
* 4、Handler的消费处理放在最后一个处理。
*/
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
pipeline.addLast("encoder", new HttpResponseEncoder());
// 启用gzip(由于使用本地存储文件,不能启用gzip)
//pipeline.addLast(new HttpContentCompressor(1));
pipeline.addLast(new ChunkedWriteHandler());
// 将HttpRequestHandler放在业务线程池中执行,避免阻塞worker线程。
pipeline.addLast(eventExecutorGroup, "httpRequestHandler", new HttpRequestHandler());
}
@Override
public ChannelInitializer<SocketChannel> newChannelInitializer() {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(loggingHandler)
.addLast(new IdleStateHandler(30, 0, 10))
.addLast("decoder", new HttpRequestDecoder())
.addLast("http-aggregator", new HttpObjectAggregator(65536))
.addLast("encoder", new HttpResponseEncoder())
.addLast("chunk", new ChunkedWriteHandler())
.addLast(businessExecutors, "business-handler", new HttpServerHandler())
.addLast(new HeartbeatHandler())
.addLast(exceptionHandler);
}
};
}
private ServerBootstrap serverBootstrap() {
boot.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new HttpRequestDecoder())
.addLast("http-aggregator", new HttpObjectAggregator(65536))
// .addLast("encoder", new HttpResponseDecoder())
.addLast("base-encoder", new HttpResponseEncoder())
.addLast("chunk", new ChunkedWriteHandler())
.addLast("handler", new TestHttpServerHandler());
}
}
);
return boot;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
config.getChannelInboundHandlerList().forEach(pipeline::addLast);
config.getChannelOutboundHandlerList().forEach(pipeline::addLast);
pipeline.addLast(new HttpResponseEncoder());
config.getHttpResponseHandlerList().forEach(pipeline::addLast);
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));
pipeline.addLast(new FrontHandler());
pipeline.addLast(new ExceptionHandler());
ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.debug("channel close");
}
});
}
@Override
public void channelCreated(Channel ch) throws Exception {
logger.debug("channelCreated. Channel ID: {}", ch.id());
NioSocketChannel channel = (NioSocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true);
ChannelPipeline pipeline = channel.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 64));
pipeline.addLast(new ChunkedWriteHandler());
// pipeline.addLast(new ReadTimeoutHandler(requestHolder.route.getTimeoutInMilliseconds(), TimeUnit.MILLISECONDS));
pipeline.addLast(new BackHandler());
}
private Bootstrap bootstrap(ClientHandler handler) {
Bootstrap b = new Bootstrap();
b.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000)
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new ReadTimeoutHandler(10 * 60 * 1000));
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(6553600));
p.addLast(handler);
}
});
return b;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void initChannel( SocketChannel _channel ) throws Exception {
ChannelPipeline pipeline = _channel.pipeline();
// 是否使用客户端模式
if( CommonConfig.getBoolean( "jees.jsts.websocket.ssl.enable", false ) ){
SSLEngine engine = sslContext1.createSSLEngine();
// 是否需要验证客户端
engine.setUseClientMode(false);
// engine.setNeedClientAuth(false);
pipeline.addFirst("ssl", new SslHandler( engine ));
}
pipeline.addLast( new IdleStateHandler( 100 , 0 , 0 , TimeUnit.SECONDS ) );
pipeline.addLast( new HttpServerCodec() );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new HttpObjectAggregator( 8192 ) );
pipeline.addLast( new WebSocketServerProtocolHandler( CommonConfig.getString( ISocketBase.Netty_WebSocket_Url, "/" ) ) );
pipeline.addLast( CommonContextHolder.getBean( WebSocketHandler.class ) );
}
@Override
public void initChannel(SocketChannel channel) throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = channel.pipeline();
// Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new ServletNettyHandler(this.dispatcherServlet));
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
public WsServer() {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
// 负责对外连接线程
parentGroup = new NioEventLoopGroup();
// 负责对内分发业务的线程
childGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 30秒空闲时间设置
ch.pipeline().addLast(new IdleStateHandler(30, 0, 60));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
ch.pipeline().addLast(new HttpServerCodec());
// 针对大文件上传时,把 HttpMessage 和 HttpContent 聚合成一个
// FullHttpRequest,并定义可以接受的数据大小64M(可以支持params+multipart)
ch.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
// 针对大文件下发,分块写数据
ch.pipeline().addLast(new ChunkedWriteHandler());
// WebSocket 访问地址
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/akaxin/ws"));
// 自定义handler
ch.pipeline().addLast(new WsServerHandler(executor));
}
});
}
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.option(ChannelOption.SO_BACKLOG, 1024)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder",new HttpRequestDecoder());
ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65535));//�������Ϣת����һ��
ch.pipeline().addLast("http-encoder",new HttpResponseEncoder());
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());//���������������
ch.pipeline().addLast("http-server",new HttpHandler());
}
});
try {
ChannelFuture future = bootstrap.bind(8888).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static Channel connect(
boolean management, ConfigManager configManager, int readTimeOut) {
Logger logger = LoggerFactory.getLogger(ModelServerTest.class);
final Connector connector = configManager.getListener(management);
try {
Bootstrap b = new Bootstrap();
final SslContext sslCtx =
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
b.group(Connector.newEventLoopGroup(1))
.channel(connector.getClientChannel())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
if (connector.isSsl()) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ReadTimeoutHandler(readTimeOut));
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(6553600));
p.addLast(new TestHandler());
}
});
return b.connect(connector.getSocketAddress()).sync().channel();
} catch (Throwable t) {
logger.warn("Connect error.", t);
}
return null;
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new ChunkedWriteHandler());
HttpServerHandler httpServerHandler = new HttpServerHandler(this, httpResource);
processHttpHandler(httpServerHandler);
ch.pipeline().addLast("http", httpServerHandler);
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpServerCodec", new HttpServerCodec());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpObjectAggregator", new HttpObjectAggregator(65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "ChunkedWriteHandler", new ChunkedWriteHandler());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "compressor ", new HttpContentCompressor());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "protocol", new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "mqttWebSocket", new MqttWebSocketCodec());
HttpResourceHander httpResourceHander = new HttpResourceHander(httpResource);
ch.pipeline().addLast("httpResource", httpResourceHander);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslContext != null) {
pipeline.addLast(sslContext.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpTestServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpStaticFileServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new CorsHandler(corsConfig));
pipeline.addLast(new OkResponseHandler());
}
private static void check(ChunkedInput<?>... inputs) {
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
for (ChunkedInput<?> input : inputs) {
ch.writeOutbound(input);
}
assertTrue(ch.finish());
int i = 0;
int read = 0;
HttpContent lastHttpContent = null;
for (;;) {
HttpContent httpContent = (HttpContent) ch.readOutbound();
if (httpContent == null) {
break;
}
if (lastHttpContent != null) {
assertTrue("Chunk must be DefaultHttpContent", lastHttpContent instanceof DefaultHttpContent);
}
ByteBuf buffer = httpContent.content();
while (buffer.isReadable()) {
assertEquals(BYTES[i++], buffer.readByte());
read++;
if (i == BYTES.length) {
i = 0;
}
}
buffer.release();
// Save last chunk
lastHttpContent = httpContent;
}
assertEquals(BYTES.length * inputs.length, read);
assertSame("Last chunk must be DefaultLastHttpContent", LastHttpContent.EMPTY_LAST_CONTENT, lastHttpContent);
}
private void dispatchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
// aggregate HttpRequest/HttpContent/LastHttpContent to FullHttpRequest
pipeline.addLast(new HttpObjectAggregator(8096));
pipeline.addLast(HttpHandler.getInstance(channelListener));
pipeline.remove(this);
// 将channelActive事件传递到HttpHandler
ctx.fireChannelActive();
}
public WsServer() {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
// 负责对外连接线程
parentGroup = new NioEventLoopGroup();
// 负责对内分发业务的线程
childGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 30秒空闲时间设置
ch.pipeline().addLast(new IdleStateHandler(30, 0, 60));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
ch.pipeline().addLast(new HttpServerCodec());
// 针对大文件上传时,把 HttpMessage 和 HttpContent 聚合成一个
// FullHttpRequest,并定义可以接受的数据大小64M(可以支持params+multipart)
ch.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
// 针对大文件下发,分块写数据
ch.pipeline().addLast(new ChunkedWriteHandler());
// WebSocket 访问地址
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/akaxin/ws"));
// 自定义handler
ch.pipeline().addLast(new WsServerHandler(executor));
}
});
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpStaticFileServerHandler());
}
@Override
public void initChannel(final SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1048576))
.addLast(new HttpContentCompressor())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpHandler(harvester, helpExposition));
}