下面列出了怎么用io.netty.channel.epoll.EpollEventLoopGroup的API类实例代码及写法,或者点击链接到github查看源代码。
static void start() {
final AtomicLong counter = new AtomicLong();
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(@Nullable Runnable runnable) {
Thread thr = new Thread(runnable);
thr.setName("ahc" + counter.getAndIncrement());
thr.setDaemon(true);
return thr;
}
};
useEpoll = Epoll.isAvailable();
useOpenSsl = false;
evlg = useEpoll ? new EpollEventLoopGroup(2,tf) : new NioEventLoopGroup(2,tf);
DefaultAsyncHttpClientConfig config = builder().build();
client = new DefaultAsyncHttpClient(config);
}
protected ChannelFuture doTcpConntecSync(DFTcpClientCfg cfg, EventLoopGroup ioGroup, ChannelHandler handler){
if(ioGroup == null){
return null;
}
Bootstrap boot = new Bootstrap();
boot.group(ioGroup)
.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_KEEPALIVE, cfg.isKeepAlive())
.option(ChannelOption.SO_RCVBUF, cfg.getSoRecvBufLen())
.option(ChannelOption.SO_SNDBUF, cfg.getSoSendBufLen())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)cfg.getConnTimeout())
.option(ChannelOption.TCP_NODELAY, cfg.isTcpNoDelay())
.handler(new TcpHandlerInit(false, cfg.getTcpProtocol(),
cfg.getTcpMsgMaxLength(), 0, 0, cfg.getWsUri(), null,
cfg.getDecoder(), cfg.getEncoder(), cfg.getUserHandler(), cfg.getSslCfg()
, cfg.getReqData(), handler));
if(ioGroup instanceof EpollEventLoopGroup){
boot.channel(EpollSocketChannel.class);
}else{
boot.channel(NioSocketChannel.class);
}
ChannelFuture future = boot.connect(cfg.host, cfg.port);
return future;
}
@Override
public void start() {
bossGroup = epoll ? new EpollEventLoopGroup() : new NioEventLoopGroup();
workerGroup = epoll ? new EpollEventLoopGroup() : new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer(this))
.option(ChannelOption.SO_BACKLOG, socketBacklog)
.childOption(ChannelOption.SO_KEEPALIVE, socketKeepAlive);
bootstrap.localAddress(this.getAddress(), this.getPort());
bootstrap.bind().addListener(future -> {
if (future.isSuccess()) {
log.info(CleanstoneServer.getMessage("net.netty.bind-successful",
protocol.getClass().getSimpleName(), getAddress(), getPort() + ""));
} else {
log.error(CleanstoneServer.getMessage("net.netty.bind-failure",
getAddress().getHostAddress(), getPort() + ""), future.cause());
}
});
running = true;
}
/**
* Create the Netty sockets.
*/
public void createSocket() {
int threads = Runtime.getRuntime().availableProcessors();
this.bossGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.workerGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.bootstrap.group(bossGroup, workerGroup)
.channel((Epoll.isAvailable()) ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new MusChannelInitializer(this))
.option(ChannelOption.SO_BACKLOG, BACK_LOG)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(BUFFER_SIZE))
.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
}
/**
* Create the Netty sockets.
*/
public void createSocket() {
int threads = Runtime.getRuntime().availableProcessors();
this.bossGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.workerGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.bootstrap.group(bossGroup, workerGroup)
.channel((Epoll.isAvailable()) ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new NettyChannelInitializer(this))
.option(ChannelOption.SO_BACKLOG, BACK_LOG)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(BUFFER_SIZE))
.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
}
@Override
public void dump(Map<String, Object> metrics) {
if( nativeNetty) {
metrics.put("krpc.server.nativeNetty", true);
metrics.put("krpc.server.boss.threads", ((EpollEventLoopGroup)bossGroup).executorCount());
metrics.put("krpc.server.worker.threads", ((EpollEventLoopGroup)workerGroup).executorCount());
} else {
metrics.put("krpc.server.boss.threads", ((NioEventLoopGroup)bossGroup).executorCount());
metrics.put("krpc.server.worker.threads", ((NioEventLoopGroup)workerGroup).executorCount());
}
metrics.put("krpc.server.conns.size",conns.size());
metrics.put("krpc.server.port",port);
metrics.put("krpc.server.host",host);
metrics.put("krpc.server.idleSeconds",idleSeconds);
metrics.put("krpc.server.maxPackageSize",maxPackageSize);
metrics.put("krpc.server.maxConns",maxConns);
metrics.put("krpc.server.backlog",backlog);
}
/**
* 得到服务端Boss线程池
*
* @param config 服务端配置
* @return 服务端Boss线程池
*/
public static EventLoopGroup getServerBossEventLoopGroup(ServerTransportConfig config) {
String type = config.getProtocolType();
EventLoopGroup bossGroup = serverBossGroups.get(type);
if (bossGroup == null) {
synchronized (NettyHelper.class) {
bossGroup = serverBossGroups.get(config.getProtocolType());
if (bossGroup == null) {
int bossThreads = config.getBossThreads();
bossThreads = bossThreads <= 0 ? Math.max(4, SystemInfo.getCpuCores() / 2) : bossThreads;
NamedThreadFactory threadName =
new NamedThreadFactory("SEV-BOSS-" + config.getPort(), config.isDaemon());
bossGroup = config.isUseEpoll() ?
new EpollEventLoopGroup(bossThreads, threadName) :
new NioEventLoopGroup(bossThreads, threadName);
serverBossGroups.put(type, bossGroup);
refCounter.putIfAbsent(bossGroup, new AtomicInteger(0));
}
}
}
refCounter.get(bossGroup).incrementAndGet();
return bossGroup;
}
/**
* 得到服务端IO线程池
*
* @param config 服务端配置
* @return 服务端Boss线程池
*/
public static EventLoopGroup getServerIoEventLoopGroup(ServerTransportConfig config) {
String type = config.getProtocolType();
EventLoopGroup ioGroup = serverIoGroups.get(type);
if (ioGroup == null) {
synchronized (NettyHelper.class) {
ioGroup = serverIoGroups.get(config.getProtocolType());
if (ioGroup == null) {
int ioThreads = config.getIoThreads();
ioThreads = ioThreads <= 0 ? Math.max(8, SystemInfo.getCpuCores() + 1) : ioThreads;
NamedThreadFactory threadName =
new NamedThreadFactory("SEV-IO-" + config.getPort(), config.isDaemon());
ioGroup = config.isUseEpoll() ?
new EpollEventLoopGroup(ioThreads, threadName) :
new NioEventLoopGroup(ioThreads, threadName);
serverIoGroups.put(type, ioGroup);
refCounter.putIfAbsent(ioGroup, new AtomicInteger(0));
}
}
}
refCounter.get(ioGroup).incrementAndGet();
return ioGroup;
}
/**
* 获取客户端IO线程池
*
* @return 客户端IO线程池
*/
public synchronized static EventLoopGroup getClientIOEventLoopGroup() {
if (clientIOEventLoopGroup != null && clientIOEventLoopGroup.isShutdown()) {
clientIOEventLoopGroup = null;
}
if (clientIOEventLoopGroup == null) {
int clientIoThreads = getIntValue(TRANSPORT_CLIENT_IO_THREADS);
int threads = clientIoThreads > 0 ?
clientIoThreads : // 用户配置
Math.max(4, SystemInfo.getCpuCores() + 1); // 默认cpu+1,至少4个
NamedThreadFactory threadName = new NamedThreadFactory("CLI-IO", true);
boolean useEpoll = getBooleanValue(TRANSPORT_USE_EPOLL);
clientIOEventLoopGroup = useEpoll ? new EpollEventLoopGroup(threads, threadName)
: new NioEventLoopGroup(threads, threadName);
refCounter.putIfAbsent(clientIOEventLoopGroup, new AtomicInteger(0));
// SelectStrategyFactory 未设置
}
refCounter.get(clientIOEventLoopGroup).incrementAndGet();
return clientIOEventLoopGroup;
}
/**
* Attempts to determine the {@link ChannelFactory} class that corresponds to the given
* event loop group.
*
* @param eventLoopGroup the event loop group to determine the {@link ChannelFactory} for
* @return A {@link ChannelFactory} instance for the given event loop group.
*/
@SuppressWarnings("unchecked")
public static ChannelFactory<? extends Channel> resolveSocketChannelFactory(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof DelegatingEventLoopGroup) {
return resolveSocketChannelFactory(((DelegatingEventLoopGroup) eventLoopGroup).getDelegate());
}
if (eventLoopGroup instanceof NioEventLoopGroup) {
return NioSocketChannel::new;
}
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel::new;
}
String socketFqcn = KNOWN_EL_GROUPS.get(eventLoopGroup.getClass().getName());
if (socketFqcn == null) {
throw new IllegalArgumentException("Unknown event loop group : " + eventLoopGroup.getClass());
}
return invokeSafely(() -> new ReflectiveChannelFactory(Class.forName(socketFqcn)));
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
bootstrap = new Bootstrap()
.channel(EPOLL ? EpollDatagramChannel.class : NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(this)
.group(EPOLL ? new EpollEventLoopGroup() : new NioEventLoopGroup());
this.logger.info("Epoll Status is " + EPOLL);
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (Exception e) {
this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
this.logger.critical("Perhaps a server is already running on that port?");
System.exit(1);
}
}
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
if (isEpollAvailable) {
b.group(new EpollEventLoopGroup(this.conf.getEventLoopThreadCount()))
.channel(EpollServerSocketChannel.class);
} else {
b.group(new NioEventLoopGroup(this.conf.getEventLoopThreadCount()))
.channel(NioServerSocketChannel.class);
}
b.childHandler(new DefaultServerInitializer(conf, context))
.option(ChannelOption.SO_BACKLOG, conf.getBacklog())
.option(ChannelOption.SO_REUSEADDR, true);
Channel ch = b.bind(conf.getPort()).sync().channel();
ch.closeFuture().sync();
} finally {
}
}
public UdpServerChannel(int ioThreads) {
if (ioThreads < 1) {
throw new IllegalArgumentException("IO threads cound can't be less than 1");
}
boolean epollAvailabe = Epoll.isAvailable();
if (!epollAvailabe) {
ioThreads = 1;
}
group = epollAvailabe ? new EpollEventLoopGroup(ioThreads) : new NioEventLoopGroup(ioThreads);
Class<? extends DatagramChannel> channel = epollAvailabe ? EpollDatagramChannel.class : NioDatagramChannel.class;
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
final ReadRouteChannelHandler ioReadRoute = new ReadRouteChannelHandler();
@Override
protected void initChannel(Channel ioChannel) throws Exception {
ioChannel.pipeline().addLast(ioReadRoute);
}
};
while (ioThreads-- > 0) {
Bootstrap ioBootstrap = new Bootstrap().group(group).channel(channel).handler(initializer);
if (epollAvailabe) {
ioBootstrap.option(UnixChannelOption.SO_REUSEPORT, true);
}
ioBootstraps.add(ioBootstrap);
}
}
/**
* Start ShardingSphere-Proxy.
*
* @param port port
*/
@SneakyThrows(InterruptedException.class)
public void start(final int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bossGroup = createEventLoopGroup();
if (bossGroup instanceof EpollEventLoopGroup) {
groupsEpoll(bootstrap);
} else {
groupsNio(bootstrap);
}
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
BackendExecutorContext.getInstance().getExecutorKernel().close();
}
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Client-IO-LoopGroup");
int ioThread = config.getIoThread();
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(ioThread, threadFactory);
} else {
return new NioEventLoopGroup(ioThread, threadFactory);
}
}
protected EventLoopGroup newAcceptEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Accept-IO-LoopGroup");
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(serverConfig.getAcceptThread(), threadFactory);
} else {
return new NioEventLoopGroup(serverConfig.getAcceptThread(), threadFactory);
}
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Server-IO-LoopGroup");
int iothreadNum = serverConfig.getIoThread();
if (port == 50088) {
iothreadNum = 128;
}
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(iothreadNum, threadFactory);
} else {
return new NioEventLoopGroup(iothreadNum, threadFactory);
}
}
@Override
public Void call() throws Exception {
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
CsiConfig csiConfig = ozoneConfiguration.getObject(CsiConfig.class);
OzoneClient rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);
EpollEventLoopGroup group = new EpollEventLoopGroup();
if (csiConfig.getVolumeOwner().isEmpty()) {
throw new IllegalArgumentException(
"ozone.csi.owner is not set. You should set this configuration "
+ "variable to define which user should own all the created "
+ "buckets.");
}
Server server =
NettyServerBuilder
.forAddress(new DomainSocketAddress(csiConfig.getSocketPath()))
.channelType(EpollServerDomainSocketChannel.class)
.workerEventLoopGroup(group)
.bossEventLoopGroup(group)
.addService(new IdentitiyService())
.addService(new ControllerService(rpcClient,
csiConfig.getDefaultVolumeSize()))
.addService(new NodeService(csiConfig))
.build();
server.start();
server.awaitTermination();
rpcClient.close();
return null;
}
private static NettyChannelBuilder newNettyClientChannel(Transport transport,
SocketAddress address, boolean tls, boolean testca, int flowControlWindow)
throws IOException {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
if (!tls) {
builder.usePlaintext();
} else if (testca) {
File cert = TestUtils.loadCert("ca.pem");
builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build());
}
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
switch (transport) {
case NETTY_NIO:
builder
.eventLoopGroup(new NioEventLoopGroup(0, tf))
.channelType(NioSocketChannel.class);
break;
case NETTY_EPOLL:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollSocketChannel.class);
break;
case NETTY_UNIX_DOMAIN_SOCKET:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollDomainSocketChannel.class);
break;
default:
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + transport);
}
return builder;
}
/**
* 启动
*
* @throws java.lang.Throwable 异常
*/
@Override
public void start() throws Throwable {
this.workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors()) : new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
ClientHandler clientHandler = new ClientHandler();
clientHandler.setYuRPCClient(this);
this.bootstrap.group(this.workerGroup).channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConfig.connectionTimeoutMS)
.option(ChannelOption.SO_SNDBUF, 65535)
.option(ChannelOption.SO_RCVBUF, 65535)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new StringEncoder(),
new StringDecoder(),
clientHandler,
new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS),
new HeartbeatHandler()
);
}
});
clientInvocationHandler = new ClientInvocationHandler();
clientInvocationHandler.setYuRPCClient(this);
}
public static EventLoopGroup createEventLoopGroup(int nThreads, String prefix) {
if(SUPPORTS_EPOLL){
return new EpollEventLoopGroup(nThreads, new NamedThreadFactory(prefix));
}else{
return new NioEventLoopGroup(nThreads, new NamedThreadFactory(prefix));
}
}
private EventLoopGroup newIoEventGroup() {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup();
} else {
return new NioEventLoopGroup();
}
}
private EventLoopGroup newEventLoopGroup() {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup();
} else {
return new NioEventLoopGroup();
}
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
public BrokerService(MqttConfig mqttConfig) {
this.mqttConfig = mqttConfig;
this.brokerRoom = new BrokerRoom(mqttConfig);
connectProcessor = new ConnectProcessor(brokerRoom);
disconnectProcessor = new DisconnectProcessor(brokerRoom);
pingProcessor = new PingProcessor();
publishProcessor = new PublishProcessor(brokerRoom);
pubRelProcessor = new PubRelProcessor(brokerRoom);
subscribeProcessor = new SubscribeProcessor(brokerRoom);
unSubscribeProcessor = new UnSubscribeProcessor(brokerRoom);
pubRecProcessor = new PubRecProcessor(brokerRoom);
pubAckProcessor = new PubAckProcessor(brokerRoom);
pubCompProcessor = new PubCompProcessor(brokerRoom);
if (!mqttConfig.isUseEpoll()) {
this.selectorGroup = new NioEventLoopGroup(mqttConfig.getSelectorThreadNum(),
new ThreadFactoryImpl("SelectorEventGroup"));
this.ioGroup = new NioEventLoopGroup(mqttConfig.getIoThreadNum(), new ThreadFactoryImpl("IOEventGroup"));
this.clazz = NioServerSocketChannel.class;
} else {
this.selectorGroup = new EpollEventLoopGroup(mqttConfig.getSelectorThreadNum(),
new ThreadFactoryImpl("SelectorEventGroup"));
this.ioGroup = new EpollEventLoopGroup(mqttConfig.getIoThreadNum(), new ThreadFactoryImpl("IOEventGroup"));
this.clazz = EpollServerSocketChannel.class;
}
}
protected EventLoopGroup newIoEventGroup() {
int ioThread = config.getIoThread();
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getIoThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(ioThread, threadFactory);
} else {
return new NioEventLoopGroup(ioThread, threadFactory);
}
}
protected EventLoopGroup newAcceptEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getAcceptThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(config.getAcceptThread(), threadFactory);
} else {
return new NioEventLoopGroup(config.getAcceptThread(), threadFactory);
}
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getIoThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(config.getIoThread(), threadFactory);
} else {
return new NioEventLoopGroup(config.getIoThread(), threadFactory);
}
}
/**
* 创建EventLoop
*
* @param name 名称
* @param url url
* @param threadName 线程名称
* @param ioThread ioThread
* @param share 共享标识
* @return
*/
protected static ReferenceEventLoopGroup create(final String name,
final URL url,
final String threadName,
final URLOption<Integer> ioThread,
final boolean share) {
int threads = url.getPositiveInt(ioThread);
boolean epoll = isUseEpoll(url);
logger.info(String.format("Success creating eventLoopGroup. name:%s, threads:%d, epoll:%b. ", ioThread.getName(), threads, epoll));
return new ReferenceEventLoopGroup(name,
epoll ?
new EpollEventLoopGroup(threads, new NamedThreadFactory(threadName, true)) :
new NioEventLoopGroup(threads, new NamedThreadFactory(threadName, true)),
groups, share);
}
public void start() throws InterruptedException {
InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bootstrap.channel(EpollServerSocketChannel.class);
} else if (eventLoopGroup instanceof NioEventLoopGroup) {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new NettyRpcChannelInitializer(invokerFactory, serializer, filters));
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
channel = bootstrap.bind(inet).sync().channel();
System.out.println("TurboRpcServer started. Listening on: " + hostPort);
}