下面列出了io.netty.channel.epoll.EpollChannelOption#io.netty.channel.WriteBufferWaterMark 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static InternalChannelz.SocketOptions setAndValidateGeneric(Channel channel) {
channel.config().setOption(ChannelOption.SO_LINGER, 3);
// only applicable for OIO channels:
channel.config().setOption(ChannelOption.SO_TIMEOUT, 250);
// Test some arbitrarily chosen options with a non numeric values
channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20);
channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark);
InternalChannelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
assertEquals(3, (int) socketOptions.lingerSeconds);
assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
assertEquals(
writeBufWaterMark.toString(),
socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString()));
return socketOptions;
}
public static int newServerWithInitialData(final int port, final String data) {
final HttpServer<String, ServerSentEvent> server = RxNetty.newHttpServerBuilder(
port,
new RequestHandler<String, ServerSentEvent>() {
@Override
public Observable<Void> handle(HttpServerRequest<String> req, HttpServerResponse<ServerSentEvent> resp) {
final ByteBuf byteBuf = resp.getAllocator().buffer().writeBytes(data.getBytes());
resp.writeAndFlush(new ServerSentEvent(byteBuf));
return Observable.empty();
}
})
.pipelineConfigurator(PipelineConfigurators.<String>serveSseConfigurator())
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))
.build();
server.start();
synchronized (servers) {
servers.add(server);
}
return port;
}
@Override
protected void config(ServerBootstrap bootstrap) throws Exception{
super.config(bootstrap);
if(SystemPropertyUtil.get("io.netty.leakDetectionLevel") == null &&
SystemPropertyUtil.get("io.netty.leakDetection.level") == null){
ResourceLeakDetector.setLevel(properties.getResourceLeakDetectorLevel());
}
if(SystemPropertyUtil.get("io.netty.maxDirectMemory") == null){
long maxDirectMemory = -1;
System.setProperty("io.netty.maxDirectMemory", String.valueOf(maxDirectMemory));
}
bootstrap.childOption(ChannelOption.WRITE_SPIN_COUNT,Integer.MAX_VALUE);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024,Integer.MAX_VALUE));
bootstrap.childOption(ChannelOption.AUTO_CLOSE,true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, properties.isTcpNodelay());
for (ServerListener serverListener : serverListeners) {
serverListener.config(bootstrap);
}
}
public static ServerBootstrap serverBootstrap(EventLoopGroup acceptors,
EventLoopGroup workers) {
ServerBootstrap bootstrap = new ServerBootstrap().group(acceptors, workers)
.childOption(ChannelOption.TCP_NODELAY,
true)
.childOption(ChannelOption.ALLOCATOR,
PooledDirectByteBufAllocator.INSTANCE)
.childOption(ChannelOption.SO_KEEPALIVE,
false)
.option(ChannelOption.SO_REUSEADDR,
true);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 128 * 1024));
return bootstrap;
}
/**
* init netty write buffer water mark
*/
private void initWriteBufferWaterMark() {
int lowWaterMark = this.netty_buffer_low_watermark();
int highWaterMark = this.netty_buffer_high_watermark();
if (lowWaterMark > highWaterMark) {
throw new IllegalArgumentException(
String
.format(
"[server side] bolt netty high water mark {%s} should not be smaller than low water mark {%s} bytes)",
highWaterMark, lowWaterMark));
} else {
logger.warn(
"[server side] bolt netty low water mark is {} bytes, high water mark is {} bytes",
lowWaterMark, highWaterMark);
}
this.bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
lowWaterMark, highWaterMark));
}
/**
* init netty write buffer water mark
*/
private void initWriteBufferWaterMark() {
int lowWaterMark = this.confInstance.netty_buffer_low_watermark();
int highWaterMark = this.confInstance.netty_buffer_high_watermark();
if (lowWaterMark > highWaterMark) {
throw new IllegalArgumentException(
String
.format(
"[client side] bolt netty high water mark {%s} should not be smaller than low water mark {%s} bytes)",
highWaterMark, lowWaterMark));
} else {
logger.warn(
"[client side] bolt netty low water mark is {} bytes, high water mark is {} bytes",
lowWaterMark, highWaterMark);
}
this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
lowWaterMark, highWaterMark));
}
private void doConnect(EventLoopGroup loupGroup, Class<? extends SocketChannel> serverChannelClass, boolean isEpoll)
throws InterruptedException {
final Bootstrap bootstrap = new Bootstrap();
if (isEpoll) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
}
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
bootstrap.group(loupGroup);
bootstrap.channel(serverChannelClass);
bootstrap.handler(new BenchmarkChannelInitializer(futureContainer));
for (int i = 0; i < CONNECT_COUNT; i++) {
channels[i] = bootstrap.connect(host, port).sync().channel();
queues[i] = new MpscAtomicArrayQueue<>(4 * 1024);
}
}
private Bootstrap newBootstrap() {
Bootstrap boot = new Bootstrap();
boot.channel(NettyPlatformIndependent.channelClass());
boot.option(ChannelOption.TCP_NODELAY, true);
// replace by heart beat
boot.option(ChannelOption.SO_KEEPALIVE, false);
// default is pooled direct
// ByteBuf(io.netty.util.internal.PlatformDependent.DIRECT_BUFFER_PREFERRED)
boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 32kb(for massive long connections, See
// http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points)
// 64kb(RocketMq remoting default value)
boot.option(ChannelOption.SO_SNDBUF, 32 * 1024);
boot.option(ChannelOption.SO_RCVBUF, 32 * 1024);
// temporary settings, need more tests
boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
//default is true, reduce thread context switching
boot.option(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true);
return boot;
}
private ServerBootstrap newServerBootstrap() {
ServerBootstrap serverBoot = new ServerBootstrap();
serverBoot.channel(NettyPlatformIndependent.serverChannelClass());
// connections wait for accept
serverBoot.option(ChannelOption.SO_BACKLOG, 1024);
serverBoot.option(ChannelOption.SO_REUSEADDR, true);
// replace by heart beat
serverBoot.childOption(ChannelOption.SO_KEEPALIVE, false);
serverBoot.childOption(ChannelOption.TCP_NODELAY, true);
serverBoot.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
serverBoot.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
// temporary settings, need more tests
serverBoot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
serverBoot.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//default is true, reduce thread context switching
serverBoot.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, true);
return serverBoot;
}
@Override
protected void setOptions() {
super.setOptions();
Bootstrap boot = bootstrap();
// child options
NettyConfig.NettyDomainConfigGroup.ChildConfig child = childConfig;
WriteBufferWaterMark waterMark =
createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());
boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark);
if (child.getConnectTimeoutMillis() > 0) {
boot.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, child.getConnectTimeoutMillis());
}
}
/**
* Bootstraps a server.
*
* @return a future to be completed once the server has been bound to all interfaces
*/
private CompletableFuture<Void> bootstrapServer() {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(8 * 1024, 32 * 1024));
b.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024);
b.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup);
b.channel(serverChannelClass);
if (enableNettyTls) {
try {
b.childHandler(new SslServerChannelInitializer());
} catch (SSLException e) {
return Futures.exceptionalFuture(e);
}
} else {
b.childHandler(new BasicServerChannelInitializer());
}
return bind(b);
}
private void setupClientOption(final NettyChannelBuilder channelBuilder) {
channelBuilder.keepAliveTime(clientOption.getKeepAliveTime(), TimeUnit.MILLISECONDS);
channelBuilder.keepAliveTimeout(clientOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS);
channelBuilder.keepAliveWithoutCalls(clientOption.isKeepAliveWithoutCalls());
channelBuilder.maxHeaderListSize(clientOption.getMaxHeaderListSize());
channelBuilder.maxInboundMessageSize(clientOption.getMaxInboundMessageSize());
channelBuilder.flowControlWindow(clientOption.getFlowControlWindow());
channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS);
// ChannelOption
channelBuilder.withOption(ChannelOption.TCP_NODELAY, true);
channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getConnectTimeout());
final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(clientOption.getWriteBufferLowWaterMark(), clientOption.getWriteBufferHighWaterMark());
channelBuilder.withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
if (logger.isInfoEnabled()) {
logger.info("Set clientOption {}. name={}", clientOption, factoryName);
}
}
private static InternalChannelz.SocketOptions setAndValidateGeneric(Channel channel) {
channel.config().setOption(ChannelOption.SO_LINGER, 3);
// only applicable for OIO channels:
channel.config().setOption(ChannelOption.SO_TIMEOUT, 250);
// Test some arbitrarily chosen options with a non numeric values
channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
WriteBufferWaterMark writeBufWaterMark = new WriteBufferWaterMark(10, 20);
channel.config().setOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufWaterMark);
InternalChannelz.SocketOptions socketOptions = Utils.getSocketOptions(channel);
assertEquals(3, (int) socketOptions.lingerSeconds);
assertEquals("true", socketOptions.others.get("SO_KEEPALIVE"));
assertEquals(
writeBufWaterMark.toString(),
socketOptions.others.get(ChannelOption.WRITE_BUFFER_WATER_MARK.toString()));
return socketOptions;
}
public void start() throws InterruptedException {
InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bootstrap.channel(EpollServerSocketChannel.class);
} else if (eventLoopGroup instanceof NioEventLoopGroup) {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new NettyRpcChannelInitializer(invokerFactory, serializer, filters));
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
channel = bootstrap.bind(inet).sync().channel();
System.out.println("TurboRpcServer started. Listening on: " + hostPort);
}
public void start() throws InterruptedException {
InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bootstrap.channel(EpollServerSocketChannel.class);
} else if (eventLoopGroup instanceof NioEventLoopGroup) {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new NettyRestChannelInitializer(invokerFactory, jsonMapper, filters));
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
channel = bootstrap.bind(inet).sync().channel();
System.out.println("NettyRestServer started. Listening on: " + hostPort);
}
/**
* Convert and add the given {@link SocketOption} and value to the {@link ChannelOption}s {@link Map}.
*
* @param channelOpts the {@link Map} into which add the converted {@link SocketOption}
* @param option the {@link SocketOption} to convert and add
* @param value the value to add
* @param <T> the type of the {@link SocketOption} value
* @throws IllegalArgumentException if the specified {@link SocketOption} is not supported
*/
@SuppressWarnings("rawtypes")
public static <T> void addOption(final Map<ChannelOption, Object> channelOpts, final SocketOption<T> option,
final Object value) {
if (option == StandardSocketOptions.IP_MULTICAST_IF) {
channelOpts.put(ChannelOption.IP_MULTICAST_IF, value);
} else if (option == StandardSocketOptions.IP_MULTICAST_LOOP) {
channelOpts.put(ChannelOption.IP_MULTICAST_LOOP_DISABLED, !(Boolean) value);
} else if (option == StandardSocketOptions.IP_MULTICAST_TTL) {
channelOpts.put(ChannelOption.IP_MULTICAST_TTL, value);
} else if (option == StandardSocketOptions.IP_TOS) {
channelOpts.put(ChannelOption.IP_TOS, value);
} else if (option == StandardSocketOptions.SO_BROADCAST) {
channelOpts.put(ChannelOption.SO_BROADCAST, value);
} else if (option == StandardSocketOptions.SO_KEEPALIVE) {
channelOpts.put(ChannelOption.SO_KEEPALIVE, value);
} else if (option == StandardSocketOptions.SO_LINGER) {
channelOpts.put(ChannelOption.SO_LINGER, value);
} else if (option == StandardSocketOptions.SO_RCVBUF) {
channelOpts.put(ChannelOption.SO_RCVBUF, value);
} else if (option == StandardSocketOptions.SO_REUSEADDR) {
channelOpts.put(ChannelOption.SO_REUSEADDR, value);
} else if (option == StandardSocketOptions.SO_SNDBUF) {
channelOpts.put(ChannelOption.SO_SNDBUF, value);
} else if (option == StandardSocketOptions.TCP_NODELAY) {
channelOpts.put(ChannelOption.TCP_NODELAY, value);
} else if (option == ServiceTalkSocketOptions.CONNECT_TIMEOUT) {
channelOpts.put(ChannelOption.CONNECT_TIMEOUT_MILLIS, value);
} else if (option == ServiceTalkSocketOptions.WRITE_BUFFER_THRESHOLD) {
final int writeBufferThreshold = (Integer) value;
channelOpts.put(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(writeBufferThreshold >>> 1,
writeBufferThreshold));
} else {
throw unsupported(option);
}
}
private void start() throws Exception {
if (Node.RPC_PORT < 0) {
LOG.error("No rpc port is specified, rpc server starting failed.");
return;
}
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Boss thread pool below for handling incoming connections.
EventLoopGroup bossGroup = new NioEventLoopGroup();
// Worker thread pool below for handling channel read/write. Here we only allow 1 thread to make sure the message order.
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(
10 * 1024 * 1024, //10m
20 * 1024 * 1024 //20m
)
)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new RpcServerInitializer(sslCtx));
parentChannel = b.bind(Node.RPC_PORT).sync().channel();
parentChannel.closeFuture().addListener(future -> {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
LOG.info("The EventLoopGroup has been terminated completely and all Channels that belong to the group have been closed.");
});
}
private static void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass,
boolean isEpoll) throws InterruptedException {
try {
InetSocketAddress inet = new InetSocketAddress(port);
ServerBootstrap b = new ServerBootstrap();
if (isEpoll) {
b.option(EpollChannelOption.SO_REUSEPORT, true);
}
b.option(ChannelOption.SO_BACKLOG, 1024 * 8);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.SO_RCVBUF, 256 * 1024);
b.group(loupGroup).channel(serverChannelClass).childHandler(new BenchmarkChannelInitializer());
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
b.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
Channel ch = b.bind(inet).sync().channel();
System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
@Override
protected void initOptions(ServerBootstrap b) {
super.initOptions(b);
b.option(ChannelOption.SO_BACKLOG, 1024);
/**
* TCP层面的接收和发送缓冲区大小设置,
* 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,
* 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。
*/
if (snd_buf.connect_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.connect_server);
if (rcv_buf.connect_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.connect_server);
/**
* 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。
* 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。
* 但是异步就不同了,异步就像决堤的大坝一样,洪水是畅通无阻。如果这个时候没有进行有效的限流措施就很容易把后端冲垮。
* 如果一下子把后端冲垮倒也不是最坏的情况,就怕把后端冲的要死不活。
* 这个时候,后端就会变得特别缓慢,如果这个时候前面的应用使用了一些无界的资源等,就有可能把自己弄死。
* 那么现在要介绍的这个坑就是关于Netty里的ChannelOutboundBuffer这个东西的。
* 这个buffer是用在netty向channel write数据的时候,有个buffer缓冲,这样可以提高网络的吞吐量(每个channel有一个这样的buffer)。
* 初始大小是32(32个元素,不是指字节),但是如果超过32就会翻倍,一直增长。
* 大部分时候是没有什么问题的,但是在碰到对端非常慢(对端慢指的是对端处理TCP包的速度变慢,比如对端负载特别高的时候就有可能是这个情况)的时候就有问题了,
* 这个时候如果还是不断地写数据,这个buffer就会不断地增长,最后就有可能出问题了(我们的情况是开始吃swap,最后进程被linux killer干掉了)。
* 为什么说这个地方是坑呢,因为大部分时候我们往一个channel写数据会判断channel是否active,但是往往忽略了这种慢的情况。
*
* 那这个问题怎么解决呢?其实ChannelOutboundBuffer虽然无界,但是可以给它配置一个高水位线和低水位线,
* 当buffer的大小超过高水位线的时候对应channel的isWritable就会变成false,
* 当buffer的大小低于低水位线的时候,isWritable就会变成true。所以应用应该判断isWritable,如果是false就不要再写数据了。
* 高水位线和低水位线是字节数,默认高水位是64K,低水位是32K,我们可以根据我们的应用需要支持多少连接数和系统资源进行合理规划。
*/
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
connect_server_low, connect_server_high
));
}
private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
Class<? extends ServerChannel> serverChannelClass;
if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
serverChannelClass = EpollServerSocketChannel.class;
}
else {
serverChannelClass = NioServerSocketChannel.class;
}
bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
if (scheduledExecutor != null) {
bootstrap.handler(scheduledExecutor);
}
bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));
bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
// setting buffer size can improve I/O
.childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
// recommended in
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.bind(port).sync();
}
private void groupsEpoll(final ServerBootstrap bootstrap) {
workerGroup = new EpollEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.option(EpollChannelOption.SO_BACKLOG, 128)
.option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
.option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(EpollChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerHandlerInitializer());
}
private void groupsNio(final ServerBootstrap bootstrap) {
workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerHandlerInitializer());
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testOptionsHaveCorrectTypes() throws Exception {
final ServerBootstrap bootstrap = new ServerBootstrap();
final ChannelOptions options = new ChannelOptions();
options.setAllocator(new PooledByteBufAllocator());
options.setRecvBufAllocator(new AdaptiveRecvByteBufAllocator());
options.setConnectTimeout(1);
options.setWriteSpinCount(1);
options.setWriteBufferWaterMark(new WriteBufferWaterMark(8192, 32768));
options.setAllowHalfClosure(true);
options.setAutoRead(true);
options.setSoBroadcast(true);
options.setSoKeepAlive(true);
options.setSoReuseAddr(true);
options.setSoSndBuf(8192);
options.setSoRcvBuf(8192);
options.setSoLinger(0);
options.setSoBacklog(0);
options.setSoTimeout(0);
options.setIpTos(0);
options.setIpMulticastAddr(getLoopbackAddress());
options.setIpMulticastIf(getNetworkInterfaces().nextElement());
options.setIpMulticastTtl(300);
options.setIpMulticastLoopDisabled(true);
options.setTcpNodelay(true);
final Map<ChannelOption, Object> channelOptionMap = options.get();
channelOptionMap.forEach((key, value) -> {
bootstrap.option(key, value);
bootstrap.childOption(key, value);
});
}
@Override
protected void setOptions() {
super.setOptions();
ServerBootstrap boot = bootstrap();
// child options
NettyConfig.NettyDomainConfigGroup.ChildConfig child = configGroup.child();
WriteBufferWaterMark waterMark =
createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());
boot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark);
}
private @NotNull Bootstrap bootstrap(@NotNull TChannel topChannel) {
return new Bootstrap()
.group(this.childGroup)
.channel(useEpoll ? EpollSocketChannel.class : NioSocketChannel.class)
.handler(this.channelInitializer(false, topChannel))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK))
.validate();
}
private @NotNull ServerBootstrap serverBootstrap(@NotNull TChannel topChannel) {
return new ServerBootstrap()
.group(this.bossGroup, this.childGroup)
.channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(this.channelInitializer(true, topChannel))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(
ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK)
)
.validate();
}
/**
* Bootstraps a new channel to the given address.
*
* @param address the address to which to connect
* @return a future to be completed with the connected channel
*/
private CompletableFuture<Channel> bootstrapClient(Address address) {
CompletableFuture<Channel> future = new OrderedFuture<>();
final InetAddress resolvedAddress = address.address(true);
if (resolvedAddress == null) {
future.completeExceptionally(new IllegalStateException("Failed to bootstrap client (address "
+ address.toString() + " cannot be resolved)"));
return future;
}
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(clientChannelClass);
bootstrap.remoteAddress(resolvedAddress, address.port());
if (enableNettyTls) {
try {
bootstrap.handler(new SslClientChannelInitializer(future, address));
} catch (SSLException e) {
return Futures.exceptionalFuture(e);
}
} else {
bootstrap.handler(new BasicClientChannelInitializer(future));
}
bootstrap.connect().addListener(f -> {
if (!f.isSuccess()) {
future.completeExceptionally(f.cause());
}
});
return future;
}
private Bootstrap buildBootstrap() {
Bootstrap bootstrap = new Bootstrap();
// TODO(CK): move all of these constants out into Config
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(
ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
.option(ChannelOption.AUTO_READ, true)
.group(group())
.channel(channel());
return configure(bootstrap);
}
public Bootstrap buildBootstrap(ChannelConfiguration channelConfig) {
return new Bootstrap()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(
ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
.option(ChannelOption.TCP_NODELAY, true)
.group(channelConfig.workerGroup())
.channel(channelConfig.channel());
}
private void setupServerOption(final NettyServerBuilder builder) {
// TODO @see PinpointServerAcceptor
builder.withChildOption(ChannelOption.TCP_NODELAY, true);
builder.withChildOption(ChannelOption.SO_REUSEADDR, true);
builder.withChildOption(ChannelOption.SO_RCVBUF, this.serverOption.getReceiveBufferSize());
final WriteBufferWaterMark disabledWriteBufferWaterMark = new WriteBufferWaterMark(0, Integer.MAX_VALUE);
builder.withChildOption(ChannelOption.WRITE_BUFFER_WATER_MARK, disabledWriteBufferWaterMark);
builder.handshakeTimeout(this.serverOption.getHandshakeTimeout(), TimeUnit.MILLISECONDS);
builder.flowControlWindow(this.serverOption.getFlowControlWindow());
builder.maxInboundMessageSize(this.serverOption.getMaxInboundMessageSize());
builder.maxHeaderListSize(this.serverOption.getMaxHeaderListSize());
builder.keepAliveTime(this.serverOption.getKeepAliveTime(), TimeUnit.MILLISECONDS);
builder.keepAliveTimeout(this.serverOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS);
builder.permitKeepAliveTime(this.serverOption.getPermitKeepAliveTime(), TimeUnit.MILLISECONDS);
builder.permitKeepAliveWithoutCalls(this.serverOption.isPermitKeepAliveWithoutCalls());
builder.maxConnectionIdle(this.serverOption.getMaxConnectionIdle(), TimeUnit.MILLISECONDS);
builder.maxConnectionAge(this.serverOption.getMaxConnectionAge(), TimeUnit.MILLISECONDS);
builder.maxConnectionAgeGrace(this.serverOption.getMaxConnectionAgeGrace(), TimeUnit.MILLISECONDS);
builder.maxConcurrentCallsPerConnection(this.serverOption.getMaxConcurrentCallsPerConnection());
if (logger.isInfoEnabled()) {
logger.info("Set serverOption {}. name={}, hostname={}, port={}", serverOption, name, hostname, port);
}
}