下面列出了io.netty.channel.FixedRecvByteBufAllocator#io.netty.channel.AdaptiveRecvByteBufAllocator 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public NettyClient(Node node) {
this.node = node;
boot = new Bootstrap();
AttributeKey<Node> key = null;
synchronized (NettyClient.class) {
if (AttributeKey.exists("node")) {
key = AttributeKey.valueOf("node");
} else {
key = AttributeKey.newInstance("node");
}
}
boot.attr(key, node);
boot.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_SNDBUF, 128 * 1024)
.option(ChannelOption.SO_RCVBUF, 128 * 1024)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNETCI_TIME_OUT)
.handler(new NulsChannelInitializer<>(new ClientChannelHandler()));
}
@Bean
public Bootstrap bootstrap(){
return new Bootstrap() // (1)
.group(new NioEventLoopGroup()) // (2)
.channel(NioSocketChannel.class) // (3)
.option(ChannelOption.SO_KEEPALIVE, true) // (4)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000)
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1, 102400, Integer.MAX_VALUE));
}
/**
* Default {@link ChannelInitializer}.
*
* @return Default initializer for ServiceTalk.
*/
static ChannelInitializer defaultInitializer() {
return channel -> channel.config().setRecvByteBufAllocator(
new AdaptiveRecvByteBufAllocator(512, 32768, 65536)
.respectMaybeMoreData(false)
.maxMessagesPerRead(4));
}
public void init() {
boss = new NioEventLoopGroup(1);
worker = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_SNDBUF, 128 * 1024)
.childOption(ChannelOption.SO_RCVBUF, 128 * 1024)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new NulsChannelInitializer<>(new ServerChannelHandler()));
}
public void init() {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
// .childOption(ChannelOption.SO_BACKLOG, 2048)
.childOption(ChannelOption.TCP_NODELAY, true) //Send messages immediately
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_SNDBUF, 128 * 1024)
.childOption(ChannelOption.SO_RCVBUF, 128 * 1024)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new NulsChannelInitializer<>(new ServerChannelHandler()));
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testOptionsHaveCorrectTypes() throws Exception {
final ServerBootstrap bootstrap = new ServerBootstrap();
final ChannelOptions options = new ChannelOptions();
options.setAllocator(new PooledByteBufAllocator());
options.setRecvBufAllocator(new AdaptiveRecvByteBufAllocator());
options.setConnectTimeout(1);
options.setWriteSpinCount(1);
options.setWriteBufferWaterMark(new WriteBufferWaterMark(8192, 32768));
options.setAllowHalfClosure(true);
options.setAutoRead(true);
options.setSoBroadcast(true);
options.setSoKeepAlive(true);
options.setSoReuseAddr(true);
options.setSoSndBuf(8192);
options.setSoRcvBuf(8192);
options.setSoLinger(0);
options.setSoBacklog(0);
options.setSoTimeout(0);
options.setIpTos(0);
options.setIpMulticastAddr(getLoopbackAddress());
options.setIpMulticastIf(getNetworkInterfaces().nextElement());
options.setIpMulticastTtl(300);
options.setIpMulticastLoopDisabled(true);
options.setTcpNodelay(true);
final Map<ChannelOption, Object> channelOptionMap = options.get();
channelOptionMap.forEach((key, value) -> {
bootstrap.option(key, value);
bootstrap.childOption(key, value);
});
}
/**
* starts server to handle discovery-request from client-channel
*
* @throws Exception
*/
public void startServer() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.group(acceptorGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(bootstrap);
bootstrap.childHandler(new ServiceChannelInitializer(this, config, false));
// Bind and start to accept incoming connections.
Preconditions.checkArgument(config.getServicePort().isPresent() || config.getServicePortTls().isPresent(),
"Either ServicePort or ServicePortTls should be configured.");
if (config.getServicePort().isPresent()) {
// Bind and start to accept incoming connections.
channelListen = bootstrap.bind(config.getServicePort().get()).sync().channel();
LOG.info("Started Pulsar Discovery service on {}", channelListen.localAddress());
}
if (config.getServicePortTls().isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true));
channelListenTls = tlsBootstrap.bind(config.getServicePortTls().get()).sync().channel();
LOG.info("Started Pulsar Discovery TLS service on port {}", channelListenTls.localAddress());
}
this.serviceUrl = serviceUrl();
this.serviceUrlTls = serviceUrlTls();
}
private ServerBootstrap defaultServerBootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
bootstrap.group(acceptorGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(bootstrap);
return bootstrap;
}
public FileMsgSender(String zkAddrs, String zkNode) {
this.zkNode = zkNode;
this.zkClient = ZKClientCache.get(zkAddrs);
this.bootstrap = new Bootstrap();
bootstrap.group(GROUP);
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new FileMsgSendInitializer());
}
ServerBootstrap newServer() {
return PROVIDER.createServerBootstrap()
.group(bossGroup, workerGroup)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535))
.option(ChannelOption.SO_BACKLOG, 100);
}
public NettyServer() {
try {
bootstrap = new ServerBootstrap();
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 10;// accept from channel socket
int childNum = needThreadNum * 5 + 10;// give to business handler
// 处理服务端事件组
parentGroup = new NioEventLoopGroup(parentNum, new PrefixThreadFactory("bim-boss-evenloopgroup"));
// 处理客户端连接请求的事件组
childGroup = new NioEventLoopGroup(childNum, new PrefixThreadFactory("bim-worker-evenloopgroup"));
// 用户处理所有的channel
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
/**
* 对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int
* backlog)用来初始化服务端可连接队列. 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,
* 服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
*/
bootstrap.option(ChannelOption.SO_BACKLOG, 2000);
/**
* 允许监听的端口共存
*/
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
/**
* ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,
* ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数
* 用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据, 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成
*/
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
/**
* 在4.x版本中,UnpooledByteBufAllocator是默认的allocator,尽管其存在某些限制。
* 现在PooledByteBufAllocator已经广泛使用一段时间,并且我们有了增强的缓冲区泄漏追踪机制,
* 所以是时候让PooledByteBufAllocator成为默认了。<br>
* 总结:Netty4使用对象池,重用缓冲区
*/
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
*/
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次, 因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,
* 虽然该方式有效提高网络的有效负载, 但是却造成了延时,
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,
* 该选项是需要等到发送的数据量最大的时候,一次性发送
*/
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// the ChannelHandler to use for serving the requests.
bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));
// Set the ChannelHandler which is used to serve the request for the
// Channel's
bootstrap.childHandler(new NettyChannelInitializer());
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
} catch (Exception e) {
closeGracefully();
logger.error(AkxProject.PLN + " init openzaly netty-server error.", e);
System.exit(-10);
}
}
public HttpServer() {
try {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 5;// accept from channel socket
int childNum = needThreadNum * 2 + 5;// give to business handler
bootstrap = new ServerBootstrap();
parentGroup = new NioEventLoopGroup(parentNum);
childGroup = new NioEventLoopGroup(childNum);
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
// 接受连接的可连接队列大小
bootstrap.option(ChannelOption.SO_BACKLOG, 120);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
// 设置缓存大小
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("streamer", new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpServerHandler(executor));
}
});
} catch (Exception e) {
closeGracefylly();
logger.error(AkxProject.PLN + " init http server error.", e);
System.exit(-200);
}
}
public NettyServer() {
try {
bootstrap = new ServerBootstrap();
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 10;// accept from channel socket
int childNum = needThreadNum * 5 + 10;// give to business handler
// 处理服务端事件组
parentGroup = new NioEventLoopGroup(parentNum, new PrefixThreadFactory("bim-boss-evenloopgroup"));
// 处理客户端连接请求的事件组
childGroup = new NioEventLoopGroup(childNum, new PrefixThreadFactory("bim-worker-evenloopgroup"));
// 用户处理所有的channel
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
/**
* 对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int
* backlog)用来初始化服务端可连接队列. 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,
* 服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
*/
bootstrap.option(ChannelOption.SO_BACKLOG, 2000);
/**
* 允许监听的端口共存
*/
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
/**
* ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,
* ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数
* 用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据, 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成
*/
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
/**
* 在4.x版本中,UnpooledByteBufAllocator是默认的allocator,尽管其存在某些限制。
* 现在PooledByteBufAllocator已经广泛使用一段时间,并且我们有了增强的缓冲区泄漏追踪机制,
* 所以是时候让PooledByteBufAllocator成为默认了。<br>
* 总结:Netty4使用对象池,重用缓冲区
*/
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
*/
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次, 因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,
* 虽然该方式有效提高网络的有效负载, 但是却造成了延时,
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,
* 该选项是需要等到发送的数据量最大的时候,一次性发送
*/
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// the ChannelHandler to use for serving the requests.
bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));
// Set the ChannelHandler which is used to serve the request for the
// Channel's
bootstrap.childHandler(new NettyChannelInitializer());
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
} catch (Exception e) {
closeGracefully();
logger.error(AkxProject.PLN + " init openzaly netty-server error.", e);
System.exit(-10);
}
}
public HttpServer() {
try {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 5;// accept from channel socket
int childNum = needThreadNum * 2 + 5;// give to business handler
bootstrap = new ServerBootstrap();
parentGroup = new NioEventLoopGroup(parentNum);
childGroup = new NioEventLoopGroup(childNum);
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
// 接受连接的可连接队列大小
bootstrap.option(ChannelOption.SO_BACKLOG, 120);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
// 设置缓存大小
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("streamer", new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpServerHandler(executor));
}
});
} catch (Exception e) {
closeGracefylly();
logger.error(AkxProject.PLN + " init http server error.", e);
System.exit(-200);
}
}
public NettyServer() {
try {
bootstrap = new ServerBootstrap();
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 10;// accept from channel socket
int childNum = needThreadNum * 5 + 10;// give to business handler
// 处理服务端事件组
parentGroup = new NioEventLoopGroup(parentNum, new PrefixThreadFactory("bim-boss-evenloopgroup"));
// 处理客户端连接请求的事件组
childGroup = new NioEventLoopGroup(childNum, new PrefixThreadFactory("bim-worker-evenloopgroup"));
// 用户处理所有的channel
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
/**
* 对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int
* backlog)用来初始化服务端可连接队列. 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,
* 服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
*/
bootstrap.option(ChannelOption.SO_BACKLOG, 2000);
/**
* 允许监听的端口共存
*/
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
/**
* ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,
* ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数
* 用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据, 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成
*/
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
/**
* 在4.x版本中,UnpooledByteBufAllocator是默认的allocator,尽管其存在某些限制。
* 现在PooledByteBufAllocator已经广泛使用一段时间,并且我们有了增强的缓冲区泄漏追踪机制,
* 所以是时候让PooledByteBufAllocator成为默认了。<br>
* 总结:Netty4使用对象池,重用缓冲区
*/
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
*/
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次, 因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,
* 虽然该方式有效提高网络的有效负载, 但是却造成了延时,
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,
* 该选项是需要等到发送的数据量最大的时候,一次性发送
*/
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// the ChannelHandler to use for serving the requests.
bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));
// Set the ChannelHandler which is used to serve the request for the
// Channel's
bootstrap.childHandler(new NettyChannelInitializer());
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
} catch (Exception e) {
closeGracefully();
logger.error(AkxProject.PLN + " init openzaly netty-server error.", e);
System.exit(-10);
}
}
public HttpServer() {
try {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
int needThreadNum = Runtime.getRuntime().availableProcessors() + 1;
int parentNum = 5;// accept from channel socket
int childNum = needThreadNum * 2 + 5;// give to business handler
bootstrap = new ServerBootstrap();
parentGroup = new NioEventLoopGroup(parentNum);
childGroup = new NioEventLoopGroup(childNum);
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
// 接受连接的可连接队列大小
bootstrap.option(ChannelOption.SO_BACKLOG, 120);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
// 设置缓存大小
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);// 256 KB/字节
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
/**
* 接受缓存区,动态内存分配端的算法
*/
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("streamer", new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpServerHandler(executor));
}
});
} catch (Exception e) {
closeGracefylly();
logger.error(AkxProject.PLN + " init http server error.", e);
System.exit(-200);
}
}
public static AdaptiveRecvByteBufAllocator getRecvByteBufAllocator() {
return recvByteBufAllocator;
}
Bootstrap newClient() {
return PROVIDER.createBootstrap()
.group(workerGroup)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
}