下面列出了怎么用io.netty.channel.epoll.EpollChannelOption的API类实例代码及写法,或者点击链接到github查看源代码。
private void doConnect(EventLoopGroup loupGroup, Class<? extends SocketChannel> serverChannelClass, boolean isEpoll)
throws InterruptedException {
final Bootstrap bootstrap = new Bootstrap();
if (isEpoll) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
}
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
bootstrap.group(loupGroup);
bootstrap.channel(serverChannelClass);
bootstrap.handler(new BenchmarkChannelInitializer(futureContainer));
for (int i = 0; i < CONNECT_COUNT; i++) {
channels[i] = bootstrap.connect(host, port).sync().channel();
queues[i] = new MpscAtomicArrayQueue<>(4 * 1024);
}
}
private synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer<?> initializer) {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
if (Epoll.isAvailable()) {
serverBootstrap.channel(EpollServerSocketChannel.class);
serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
serverBootstrap.channel(NioServerSocketChannel.class);
}
final ChannelHandler serverChannelHandler = BGPChannel.createServerChannelHandler(initializer);
serverBootstrap.childHandler(serverChannelHandler);
serverBootstrap.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
// Make sure we are doing round-robin processing
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
if (serverBootstrap.config().group() == null) {
serverBootstrap.group(this.bossGroup, this.workerGroup);
}
return serverBootstrap;
}
private void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass, IoMultiplexer multiplexer) throws InterruptedException {
try {
InetSocketAddress inet = new InetSocketAddress(port);
ServerBootstrap b = new ServerBootstrap();
if (multiplexer == IoMultiplexer.EPOLL) {
b.option(EpollChannelOption.SO_REUSEPORT, true);
}
b.option(ChannelOption.SO_BACKLOG, 8192);
b.option(ChannelOption.SO_REUSEADDR, true);
b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next()));
b.childOption(ChannelOption.SO_REUSEADDR, true);
Channel ch = b.bind(inet).sync().channel();
System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
public void start() throws InterruptedException {
InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bootstrap.channel(EpollServerSocketChannel.class);
} else if (eventLoopGroup instanceof NioEventLoopGroup) {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new NettyRpcChannelInitializer(invokerFactory, serializer, filters));
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
channel = bootstrap.bind(inet).sync().channel();
System.out.println("TurboRpcServer started. Listening on: " + hostPort);
}
public void start() throws InterruptedException {
InetSocketAddress inet = new InetSocketAddress(hostPort.host, hostPort.port);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
bootstrap.channel(EpollServerSocketChannel.class);
} else if (eventLoopGroup instanceof NioEventLoopGroup) {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new NettyRestChannelInitializer(invokerFactory, jsonMapper, filters));
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
channel = bootstrap.bind(inet).sync().channel();
System.out.println("NettyRestServer started. Listening on: " + hostPort);
}
public Bootstrap createBooStrap(String serviceName, final CommunicationOptions communicationOptions) {
// init netty bootstrap
Bootstrap bootstrap = new Bootstrap();
if (communicationOptions.getIoEventType() == BrpcConstants.IO_EVENT_NETTY_EPOLL) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, communicationOptions.getConnectTimeoutMillis());
bootstrap.option(ChannelOption.SO_KEEPALIVE, communicationOptions.isKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, communicationOptions.isReuseAddr());
bootstrap.option(ChannelOption.TCP_NODELAY, communicationOptions.isTcpNoDelay());
bootstrap.option(ChannelOption.SO_RCVBUF, communicationOptions.getReceiveBufferSize());
bootstrap.option(ChannelOption.SO_SNDBUF, communicationOptions.getSendBufferSize());
BrpcThreadPoolManager threadPoolManager = BrpcThreadPoolManager.getInstance();
boolean isSharing = communicationOptions.isGlobalThreadPoolSharing();
ThreadPool workThreadPool = threadPoolManager.getOrCreateClientWorkThreadPool(
serviceName, isSharing, communicationOptions.getWorkThreadNum());
ExecutorService exceptionThreadPool = threadPoolManager.getExceptionThreadPool();
final RpcClientHandler rpcClientHandler = new RpcClientHandler(workThreadPool, exceptionThreadPool);
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (communicationOptions.getChannelType() == ChannelType.SINGLE_CONNECTION) {
ch.pipeline().addLast(new IdleStateHandler(
0, 0, communicationOptions.getKeepAliveTime()));
ch.pipeline().addLast(new IdleChannelHandler());
}
ch.pipeline().addLast(rpcClientHandler);
}
};
EventLoopGroup ioThreadPool = threadPoolManager.getOrCreateClientIoThreadPool(
serviceName, isSharing, communicationOptions.getIoThreadNum(), communicationOptions.getIoEventType());
bootstrap.group(ioThreadPool).handler(initializer);
return bootstrap;
}
/**
* Use {@link EpollMode#LEVEL_TRIGGERED} for server bootstrap if level trigger enabled by system properties,
* otherwise use {@link EpollMode#EDGE_TRIGGERED}.
* @param serverBootstrap server bootstrap
*/
public static void enableTriggeredMode(ServerBootstrap serverBootstrap) {
if (epollEnabled) {
if (ConfigManager.netty_epoll_lt_enabled()) {
serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE,
EpollMode.LEVEL_TRIGGERED);
} else {
serverBootstrap
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}
}
private void groupsEpoll(final ServerBootstrap bootstrap, final int workThreads) {
workerGroup = new EpollEventLoopGroup(workThreads);
bootstrap.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.option(EpollChannelOption.TCP_CORK, true)
.option(EpollChannelOption.SO_KEEPALIVE, true)
.option(EpollChannelOption.SO_BACKLOG, 100)
.option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(nettyServerHandlerInitializer);
}
private void groupsNio(final ServerBootstrap bootstrap, final int workThreads) {
workerGroup = new NioEventLoopGroup(workThreads);
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(EpollChannelOption.TCP_CORK, true)
.option(EpollChannelOption.SO_KEEPALIVE, true)
.option(EpollChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(nettyServerHandlerInitializer);
}
private static void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass,
boolean isEpoll) throws InterruptedException {
try {
InetSocketAddress inet = new InetSocketAddress(port);
ServerBootstrap b = new ServerBootstrap();
if (isEpoll) {
b.option(EpollChannelOption.SO_REUSEPORT, true);
}
b.option(ChannelOption.SO_BACKLOG, 1024 * 8);
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.SO_RCVBUF, 256 * 1024);
b.group(loupGroup).channel(serverChannelClass).childHandler(new BenchmarkChannelInitializer());
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.SO_RCVBUF, 256 * 1024);
b.childOption(ChannelOption.SO_SNDBUF, 256 * 1024);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
Channel ch = b.bind(inet).sync().channel();
System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
ch.closeFuture().sync();
} finally {
loupGroup.shutdownGracefully().sync();
}
}
private void groupsEpoll(final ServerBootstrap bootstrap) {
workerGroup = new EpollEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.option(EpollChannelOption.SO_BACKLOG, 128)
.option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
.option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(EpollChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerHandlerInitializer());
}
public static ServerBootstrap createServerBootstrap(final @NonNull BmpSessionFactory sessionFactory,
final @NonNull BmpHandlerFactory hf, final @NonNull BmpSessionListenerFactory slf,
final @NonNull CreateChannel createChannel, final @NonNull EventLoopGroup bossGroup,
final @NonNull EventLoopGroup workerGroup, final @NonNull KeyMapping keys, boolean tryEpollSocket) {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.childHandler(createChannel.create(sessionFactory, hf, slf));
serverBootstrap.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
serverBootstrap.group(bossGroup, workerGroup);
if (!tryEpollSocket) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
if (Epoll.isAvailable()) {
serverBootstrap.channel(EpollServerSocketChannel.class);
} else {
serverBootstrap.channel(NioServerSocketChannel.class);
}
if (!keys.isEmpty()) {
if (Epoll.isAvailable()) {
serverBootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
}
return serverBootstrap;
}
public static Bootstrap createClientBootstrap(final @NonNull BmpSessionFactory sessionFactory,
final @NonNull BmpHandlerFactory hf, final @NonNull CreateChannel createChannel,
final @NonNull BmpSessionListenerFactory slf, final @NonNull InetSocketAddress remoteAddress,
final @Nullable SocketAddress localAddress, final @NonNull EventLoopGroup workerGroup,
final int connectTimeout, final @NonNull KeyMapping keys, boolean reuseAddress, boolean tryEpollSocket) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
bootstrap.group(workerGroup);
bootstrap.handler(createChannel.create(sessionFactory, hf, slf));
if (localAddress != null) {
bootstrap.localAddress(localAddress);
}
bootstrap.remoteAddress(remoteAddress);
if (!tryEpollSocket) {
bootstrap.channel(NioSocketChannel.class);
} else {
if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
}
if (!keys.isEmpty()) {
if (Epoll.isAvailable()) {
bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
}
return bootstrap;
}
private static void setChannelFactory(final Bootstrap bootstrap, final KeyMapping keys) {
if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioSocketChannel.class);
}
if (!keys.isEmpty()) {
if (Epoll.isAvailable()) {
bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
}
synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
final ServerBootstrap b = new ServerBootstrap();
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) {
initializer.initializeChannel(ch, new DefaultPromise<>(PCEPDispatcherImpl.this.executor));
}
});
b.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
if (Epoll.isAvailable()) {
b.channel(EpollServerSocketChannel.class);
b.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
b.channel(NioServerSocketChannel.class);
}
if (!this.keys.isEmpty()) {
if (Epoll.isAvailable()) {
b.option(EpollChannelOption.TCP_MD5SIG, this.keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
// Make sure we are doing round-robin processing
b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1));
if (b.config().group() == null) {
b.group(this.bossGroup, this.workerGroup);
}
return b;
}
@Override
public void onPeerAdded(final IpAddressNoZone ip, final BGPSessionPreferences prefs) {
if (prefs.getMd5Password().isPresent()) {
this.keys.put(IetfInetUtil.INSTANCE.inetAddressForNoZone(ip), prefs.getMd5Password().get());
this.channelConfig.setOption(EpollChannelOption.TCP_MD5SIG, this.keys);
}
}
private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
final InetSocketAddress localAddress) {
final Bootstrap bootstrap = new Bootstrap();
if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioSocketChannel.class);
}
if (keys != null && !keys.isEmpty()) {
if (Epoll.isAvailable()) {
bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
// Make sure we are doing round-robin processing
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR);
bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
if (bootstrap.config().group() == null) {
bootstrap.group(this.workerGroup);
}
bootstrap.localAddress(localAddress);
return bootstrap;
}
public static void main(String[] args) {
// 启动新的线程托管服务
new Thread() {
@Override
public void run() {
ChannelFuture future;//信道
EventLoopGroup group = new NioEventLoopGroup(2);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)//设置channel类型
.option(ChannelOption.TCP_NODELAY, true)//在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。 TCP_NODELAY就是用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false。
.option(ChannelOption.SO_KEEPALIVE, true)//是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
.option(ChannelOption.SO_REUSEADDR, true)//这个套接字选项通知内核,如果端口忙,但TCP状态位于 TIME_WAIT ,可以重用端口。如果端口忙,而TCP状态位于其他状态,重用端口时依旧得到一个错误信息,指明"地址已经使用中"。如果你的服务程序停止后想立即重启,而新套接字依旧使用同一端口,此时 SO_REUSEADDR 选项非常有用。必须意识到,此时任何非期望数据到达,都可能导致服务程序反应混乱,不过这只是一种可能,事实上很不可能
.option(EpollChannelOption.SO_REUSEPORT, true)//SO_REUSEPORT支持多个进程或者线程绑定到同一端口,提高服务器程序的性能
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).handler(new GimClientInitializer());//连接超时时间
future = bootstrap.connect("127.0.0.1", 9999);
// 添加future监听
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();
// 如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
if (!succeed) {
} else {
System.out.println("连接成功");
Channel c = f.channel();
// MessageClass.Message.Builder builder = MessageClass.Message.newBuilder();
//builder.setId("123");
// for (int i = 0; i < 100; i++) {
// c.writeAndFlush(msgBody);
// }
String s = "12\r\n";
byte[] msgBody = s.getBytes("utf-8");
// MqttMessage mqttMessage= MqttMessageFactory.newMessage(
// new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
// new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
// MqttConnectMessage mqttMessage = new MqttConnectMessage(
// new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0),
// new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true, true, 20),
// new MqttConnectPayload("123", "willtopic", "willmessage", "username", "password"));
// MqttPublishMessage mqttMessage = new MqttPublishMessage(
// new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
// new MqttPublishVariableHeader("MQTT", 4),
// Unpooled.wrappedBuffer("你好".getBytes()));
// List<MqttTopicSubscription> topicSubscriptions=new ArrayList<>();
// MqttTopicSubscription mqttTopicSubscription=new MqttTopicSubscription("aaa",MqttQoS.AT_LEAST_ONCE);
// topicSubscriptions.add(mqttTopicSubscription);
// MqttSubscribeMessage mqttMessage = new MqttSubscribeMessage(
// new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0),
// MqttMessageIdVariableHeader.from(4),
// new MqttSubscribePayload(topicSubscriptions));
List<String> topicSubscriptions=new ArrayList<>();
topicSubscriptions.add("aaa");
MqttUnsubscribeMessage mqttMessage = new MqttUnsubscribeMessage(
new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(4),
new MqttUnsubscribePayload(topicSubscriptions));
// MqttSubAckMessage mqttMessage = new MqttSubAckMessage(
// new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
// MqttMessageIdVariableHeader.from(4),
// new MqttSubAckPayload(1));
long ct = System.currentTimeMillis();
for (int i = 0; i < 1; i++) {
c.writeAndFlush(mqttMessage);
}
long lt = System.currentTimeMillis();
System.out.println("耗时:" + (lt - ct));
}
}
});
}
}.start();
}
public static void enableTriggeredMode(ServerBootstrap bootstrap) {
if (Epoll.isAvailable()) {
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}
@Override
protected void setOptions() {
super.setOptions();
ServerBootstrap boot = bootstrap();
// parent options
NettyConfig.NettyTcpConfigGroup.ParentConfig parent = configGroup.parent();
boot.option(ChannelOption.SO_BACKLOG, parent.getBacklog())
.option(ChannelOption.SO_REUSEADDR, parent.isReuseAddress())
.option(EpollChannelOption.SO_REUSEPORT, parent.isReusePort())
.option(EpollChannelOption.IP_FREEBIND, parent.isIpFreeBind())
.option(EpollChannelOption.IP_TRANSPARENT, parent.isIpTransparent());
if (parent.getRcvBuf() > 0) {
boot.option(ChannelOption.SO_RCVBUF, parent.getRcvBuf());
}
if (parent.getPendingFastOpenRequestsThreshold() > 0) {
boot.option(EpollChannelOption.TCP_FASTOPEN, parent.getPendingFastOpenRequestsThreshold());
}
if (parent.getTcpDeferAccept() > 0) {
boot.option(EpollChannelOption.TCP_DEFER_ACCEPT, parent.getTcpDeferAccept());
}
if (parent.isEdgeTriggered()) {
boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
// child options
NettyConfig.NettyTcpConfigGroup.ChildConfig child = configGroup.child();
WriteBufferWaterMark waterMark =
createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());
boot.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark)
.childOption(ChannelOption.SO_REUSEADDR, child.isReuseAddress())
.childOption(ChannelOption.SO_KEEPALIVE, child.isKeepAlive())
.childOption(ChannelOption.TCP_NODELAY, child.isTcpNoDelay())
.childOption(ChannelOption.ALLOW_HALF_CLOSURE, child.isAllowHalfClosure());
if (child.getRcvBuf() > 0) {
boot.childOption(ChannelOption.SO_RCVBUF, child.getRcvBuf());
}
if (child.getSndBuf() > 0) {
boot.childOption(ChannelOption.SO_SNDBUF, child.getSndBuf());
}
if (child.getLinger() > 0) {
boot.childOption(ChannelOption.SO_LINGER, child.getLinger());
}
if (child.getIpTos() > 0) {
boot.childOption(ChannelOption.IP_TOS, child.getIpTos());
}
if (child.getTcpNotSentLowAt() > 0) {
boot.childOption(EpollChannelOption.TCP_NOTSENT_LOWAT, child.getTcpNotSentLowAt());
}
if (child.getTcpKeepCnt() > 0) {
boot.childOption(EpollChannelOption.TCP_KEEPCNT, child.getTcpKeepCnt());
}
if (child.getTcpUserTimeout() > 0) {
boot.childOption(EpollChannelOption.TCP_USER_TIMEOUT, child.getTcpUserTimeout());
}
if (child.getTcpKeepIdle() > 0) {
boot.childOption(EpollChannelOption.TCP_KEEPIDLE, child.getTcpKeepIdle());
}
if (child.getTcpKeepInterval() > 0) {
boot.childOption(EpollChannelOption.TCP_KEEPINTVL, child.getTcpKeepInterval());
}
if (SocketChannelProvider.SocketType.NATIVE_EPOLL == socketType()) {
boot.childOption(EpollChannelOption.TCP_CORK, child.isTcpCork())
.childOption(EpollChannelOption.TCP_QUICKACK, child.isTcpQuickAck())
.childOption(EpollChannelOption.IP_TRANSPARENT, child.isIpTransparent());
if (child.isTcpFastOpenConnect()) {
// Requires Linux kernel 4.11 or later
boot.childOption(EpollChannelOption.TCP_FASTOPEN_CONNECT, child.isTcpFastOpenConnect());
}
if (child.isEdgeTriggered()) {
boot.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
boot.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}
}
@Override
protected void setOptions() {
super.setOptions();
Bootstrap boot = bootstrap();
// child options
NettyConfig.NettyTcpConfigGroup.ChildConfig child = childConfig;
WriteBufferWaterMark waterMark =
createWriteBufferWaterMark(child.getWriteBufferLowWaterMark(), child.getWriteBufferHighWaterMark());
boot.option(ChannelOption.WRITE_BUFFER_WATER_MARK, waterMark)
.option(ChannelOption.SO_REUSEADDR, child.isReuseAddress())
.option(ChannelOption.SO_KEEPALIVE, child.isKeepAlive())
.option(ChannelOption.TCP_NODELAY, child.isTcpNoDelay())
.option(ChannelOption.ALLOW_HALF_CLOSURE, child.isAllowHalfClosure());
if (child.getRcvBuf() > 0) {
boot.option(ChannelOption.SO_RCVBUF, child.getRcvBuf());
}
if (child.getSndBuf() > 0) {
boot.option(ChannelOption.SO_SNDBUF, child.getSndBuf());
}
if (child.getLinger() > 0) {
boot.option(ChannelOption.SO_LINGER, child.getLinger());
}
if (child.getIpTos() > 0) {
boot.option(ChannelOption.IP_TOS, child.getIpTos());
}
if (child.getConnectTimeoutMillis() > 0) {
boot.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, child.getConnectTimeoutMillis());
}
if (child.getTcpNotSentLowAt() > 0) {
boot.option(EpollChannelOption.TCP_NOTSENT_LOWAT, child.getTcpNotSentLowAt());
}
if (child.getTcpKeepCnt() > 0) {
boot.option(EpollChannelOption.TCP_KEEPCNT, child.getTcpKeepCnt());
}
if (child.getTcpUserTimeout() > 0) {
boot.option(EpollChannelOption.TCP_USER_TIMEOUT, child.getTcpUserTimeout());
}
if (child.getTcpKeepIdle() > 0) {
boot.option(EpollChannelOption.TCP_KEEPIDLE, child.getTcpKeepIdle());
}
if (child.getTcpKeepInterval() > 0) {
boot.option(EpollChannelOption.TCP_KEEPINTVL, child.getTcpKeepInterval());
}
if (SocketChannelProvider.SocketType.NATIVE_EPOLL == socketType()) {
boot.option(EpollChannelOption.TCP_CORK, child.isTcpCork())
.option(EpollChannelOption.TCP_QUICKACK, child.isTcpQuickAck())
.option(EpollChannelOption.IP_TRANSPARENT, child.isIpTransparent());
if (child.isTcpFastOpenConnect()) {
// Requires Linux kernel 4.11 or later
boot.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, child.isTcpFastOpenConnect());
}
if (child.isEdgeTriggered()) {
boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}
}
private void startServer(long startMs) throws Exception {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
boolean SSL = environment.getBoolean(ENV_KEY_SSL, false);
// Configure SSL.
SslContext sslCtx = null;
if (SSL) {
String certFilePath = environment.get(ENV_KEY_SSL_CERT, null);
String privateKeyPath = environment.get(ENE_KEY_SSL_PRIVATE_KEY, null);
String privateKeyPassword = environment.get(ENE_KEY_SSL_PRIVATE_KEY_PASS, null);
log.info("{}SSL CertChainFile Path: {}", getStartedSymbol(), certFilePath);
log.info("{}SSL PrivateKeyFile Path: {}", getStartedSymbol(), privateKeyPath);
sslCtx = SslContextBuilder.forServer(new File(certFilePath), new File(privateKeyPath), privateKeyPassword).build();
}
var bootstrap = new ServerBootstrap();
int acceptThreadCount = environment.getInt(ENC_KEY_NETTY_ACCEPT_THREAD_COUNT, DEFAULT_ACCEPT_THREAD_COUNT);
int ioThreadCount = environment.getInt(ENV_KEY_NETTY_IO_THREAD_COUNT, DEFAULT_IO_THREAD_COUNT);
// enable epoll
if (BladeKit.epollIsAvailable()) {
log.info("{}Use EpollEventLoopGroup", getStartedSymbol());
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
NettyServerGroup nettyServerGroup = EpollKit.group(acceptThreadCount, ioThreadCount);
this.bossGroup = nettyServerGroup.getBoosGroup();
this.workerGroup = nettyServerGroup.getWorkerGroup();
bootstrap.group(bossGroup, workerGroup).channel(nettyServerGroup.getSocketChannel());
} else {
log.info("{}Use NioEventLoopGroup", getStartedSymbol());
this.bossGroup = new NioEventLoopGroup(acceptThreadCount, new NamedThreadFactory("[email protected]"));
this.workerGroup = new NioEventLoopGroup(ioThreadCount, new NamedThreadFactory("[email protected]"));
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
}
scheduleEventLoop = new DefaultEventLoop();
bootstrap.childHandler(new HttpServerInitializer(sslCtx, blade, scheduleEventLoop));
String address = environment.get(ENV_KEY_SERVER_ADDRESS, DEFAULT_SERVER_ADDRESS);
Integer port = environment.getInt(ENV_KEY_SERVER_PORT, DEFAULT_SERVER_PORT);
channel = bootstrap.bind(address, port).sync().channel();
String appName = environment.get(ENV_KEY_APP_NAME, "Blade");
String url = Ansi.BgRed.and(Ansi.Black).format(" %s:%d ", address, port);
String protocol = SSL ? "https" : "http";
log.info("{}{} initialize successfully, Time elapsed: {} ms", getStartedSymbol(), appName, (System.currentTimeMillis() - startMs));
log.info("{}Blade start with {}", getStartedSymbol(), url);
log.info("{}Open browser access {}://{}:{} ⚡\r\n", getStartedSymbol(), protocol, address.replace(DEFAULT_SERVER_ADDRESS, LOCAL_IP_ADDRESS), port);
blade.eventManager().fireEvent(EventType.SERVER_STARTED, new Event().attribute("blade", blade));
}
@Override
public void onPeerRemoved(final IpAddressNoZone ip) {
if (this.keys.remove(IetfInetUtil.INSTANCE.inetAddressForNoZone(ip)) != null) {
this.channelConfig.setOption(EpollChannelOption.TCP_MD5SIG, this.keys);
}
}