下面列出了怎么用 io.netty.handler.codec.mqtt.MqttDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void initChannel(Channel channel) throws Exception {
//向该pipeline的channel中添加handler
// 把netty提供的mqtt解码器放入到Channel通道中
ChannelPipeline pipeline = channel.pipeline();
//按顺序加入handler
//1、把MqttEncoder置于handler链的首部,用于处理消息的编码
pipeline.addLast("encoder",MqttEncoder.INSTANCE);
//2、把MqttDecoder置于handler链的第二环处理消息的解码
pipeline.addLast("decoder",new MqttDecoder());
//3、把netty提供的心跳handler加入到pipeline
//IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
//30s没有入站消息,则:关闭此连接。 同时,每隔5秒发送一次ping。
pipeline.addLast("heartbeatHandler",new IdleStateHandler(20,30,0,TimeUnit.SECONDS));
//4、把自己的handler加入到管道的末端
//channel.pipeline().addLast("mqttPingHandler", new MqttPingHandler(5));//定义keepalive时间为5s
pipeline.addLast("brokerHandler",new MqttBrokerHandler(sessionManager,connectionFactory));
}
private Channel tcpServer(int port) throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addFirst("idle", new IdleStateHandler(
0,
0,
weEventConfig.getKeepAlive()));
//channelPipeline.addLast("ssl", getSslHandler(sslContext, socketChannel.alloc()));
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("broker", new TcpHandler(protocolProcess));
}
});
return serverBootstrap.bind(port).sync().channel();
}
@Override
public void addPipeline(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyReaderIdleTimeSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
if(autoFlushIdleTime > 0) {
pipeline.addLast("autoflush", new MqttAutoFlushChannelHandler(autoFlushIdleTime, TimeUnit.SECONDS));
}
pipeline.addLast("decoder", new MqttDecoder(messageMaxLength));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("messageLogger",mqttMessageLoggerChannelHandler);
if(isEnableMetrics()) {
if(mqttDropWizardMetricsChannelHandler == null) {
mqttDropWizardMetricsChannelHandler = new MqttDropWizardMetricsChannelHandler();
mqttDropWizardMetricsChannelHandler.init(metricsLibratoEmail, metricsLibratoToken, metricsLibratoSource);
}
pipeline.addLast("wizardMetrics", mqttDropWizardMetricsChannelHandler);
}
pipeline.addLast("handler", mqttServerChannelHandler);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();//设置ChannelPipeLine
SslHandler sslHandler = null;
//判断SSL处理器处理类是否为空,如果不为空,将SSL处理器加入到ChannelPipeLine
if (sslHandlerProvider != null) {
sslHandler = sslHandlerProvider.getSslHandler();
pipeline.addLast(sslHandler);
}
//添加负载内容的解编码器
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
adaptor, sslHandler, quotaService);
//添加Mqtt协议处理器
pipeline.addLast(handler);
//异步操作完成时回调
ch.closeFuture().addListener(handler);
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = null;
if (sslHandlerProvider != null) {
sslHandler = sslHandlerProvider.getSslHandler();
pipeline.addLast(sslHandler);
}
pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler(msgProducer, deviceService, authService, assetService,
assetAuthService, relationService, sslHandler);
pipeline.addLast(handler);
// ch.closeFuture().addListener(handler);
}
public MQTTHardwareServer(Holder holder) {
super(holder.props.getProperty("listen.address"),
holder.props.getIntProperty("hardware.mqtt.port"), holder.transportTypeHolder);
var hardTimeoutSecs = holder.limits.hardwareIdleTimeout;
var mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder);
var alreadyLoggedHandler = new AlreadyLoggedHandler();
var hardwareChannelStateHandler = new HardwareChannelStateHandler(holder);
channelInitializer = new ChannelInitializer<>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("MqttIdleStateHandler", new IdleStateHandler(hardTimeoutSecs, hardTimeoutSecs, 0))
.addLast(hardwareChannelStateHandler)
.addLast(new MqttDecoder())
.addLast(MqttEncoder.INSTANCE)
.addLast(mqttHardwareLoginHandler)
.addLast(alreadyLoggedHandler);
}
};
log.debug("hard.socket.idle.timeout = {}", hardTimeoutSecs);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
GatewayTransportHandler handler = new GatewayTransportHandler();
pipeline.addLast(handler);
socketChannel.closeFuture().addListener(handler);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
MqttTransportHandler handler = new MqttTransportHandler();
pipeline.addLast(handler);
socketChannel.closeFuture().addListener(handler);
}
private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler,
NewNettyMQTTHandler handler) {
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("handler", handler);
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(HANDLER_MQTTDECODER, new MqttDecoder());
ch.pipeline().addLast(HANDLER_MQTTENCODER, MqttEncoder.INSTANCE);
ch.pipeline().addLast(HANDLER_MQTTHANDER, new MqttServerHandler(protocolProcess));
}
@Override
protected void initSocketChannel(SocketChannel ch) {
ClientProtocolProcess proObj = new ClientProtocolProcess(this, consumerProcess, producerProcess);
super.initSocketChannel(ch);
ch.pipeline().addLast("decoder", new MqttDecoder());
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
ch.pipeline().addLast("mqttHander", new MqttClientHandler(proObj));
}
private void startTcpServer(boolean useSsl, Integer port) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(selectorGroup, ioGroup).channel(clazz)
.option(ChannelOption.SO_BACKLOG, mqttConfig.getTcpBackLog())
.childOption(ChannelOption.TCP_NODELAY, mqttConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, mqttConfig.getTcpSndBuf())
.option(ChannelOption.SO_RCVBUF, mqttConfig.getTcpRcvBuf())
.option(ChannelOption.SO_REUSEADDR, mqttConfig.isTcpReuseAddr())
.childOption(ChannelOption.SO_KEEPALIVE, mqttConfig.isTcpKeepAlive())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if (useSsl) {
pipeline.addLast("ssl",
NettySslHandler.getSslHandler(socketChannel, mqttConfig.isUseClientCA(),
mqttConfig.getSslKeyStoreType(), mqttConfig.getSslKeyFilePath(),
mqttConfig.getSslManagerPwd(), mqttConfig.getSslStorePwd()));
}
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0))
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("mqttDecoder", new MqttDecoder(mqttConfig.getMaxMsgSize()))
.addLast("nettyConnectionManager", new NettyConnectHandler(brokerRoom.getNettyEventExcutor()))
.addLast("nettyMqttHandler", new NettyMqttHandler());
}
});
if (mqttConfig.isPooledByteBufAllocatorEnable()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
bootstrap.bind(port).sync();
log.info("[Server] -> start tcp server {} success,port = {}", useSsl ? "with ssl" : "", port);
} catch (InterruptedException ex) {
log.error("[Server] -> start tcp server {} failure.cause={}", useSsl ? "with ssl" : "", ex);
}
}
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
.addLast(new MqttDecoder(mqttContext.getMqttConfig().getMaxPayloadSize()))
.addLast(MqttEncoder.INSTANCE)
.addLast(new ConnectionHandler())
.addLast(newMqttCommandInvocation());
}
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST))
.addLast(new WebSocketFrameToByteBufDecoder())
.addLast(new ByteBufToWebSocketFrameEncoder())
.addLast(new MqttDecoder(mqttContext.getMqttConfig().getMaxPayloadSize()))
.addLast(MqttEncoder.INSTANCE)
.addLast(new ConnectionHandler())
.addLast(newMqttCommandInvocation());
}
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
log.info("Starting MQTT transport server");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
//pipeline.addLast("idleStateHandler", new IdleStateHandler(10,2,12, TimeUnit.SECONDS));
MqttTransportHandler handler = new MqttTransportHandler(protocolProcess);
pipeline.addLast(handler);
}
});
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
}
private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
// adding the idle state handler for timeout on CONNECT packet
pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
// as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout)
// the connection is closed
ctx.channel().close();
}
}
}
});
}
private void initChannel(ChannelPipeline pipeline) {
// add into pipeline netty's (en/de)coder
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
ping();
}
}
}
});
}
}
@Override
public void addChannelHandlers(ChannelPipeline pipeline) {
pipeline.addLast(MqttEncoder.INSTANCE);
pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
pipeline.addLast(new MQTTProtocolHandler(server, this));
}
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ----配置Protobuf处理器----
//用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
//pipeline.addLast(new ProtobufVarint32FrameDecoder());
//
//配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
//pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));
// 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
//pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
// 配置Protobuf编码器,发送的消息会先经过编码
//pipeline.addLast(new ProtobufEncoder());
// ----Protobuf处理器END----
// pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// pipeline.addLast(new SimpleChannelInboundHandler<String>() {
//
// @Override
// public void handlerAdded(ChannelHandlerContext ctx)
// throws Exception {
// Channel incoming = ctx.channel();
// System.out.println("[Client] - " + incoming.remoteAddress()
// + " 连接过来");
//
// //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
// }
//
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, String msg)
// throws Exception {
//
// //System.out.println("收到消息:" + msg);
//
// }
//
// });
pipeline.addLast("decoder", MqttEncoder.INSTANCE);
pipeline.addLast("encoder", new MqttDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>(){
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
System.out.println(mqttMessage.toString());
}
});
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ----配置Protobuf处理器----
// 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
// pipeline.addLast(new ProtobufVarint32FrameDecoder());
//配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
// pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));
// 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
// pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
// 配置Protobuf编码器,发送的消息会先经过编码
// pipeline.addLast(new ProtobufEncoder());
// ----Protobuf处理器END----
// pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// pipeline.addLast(new SimpleChannelInboundHandler<MessageClass.Message>() {
// @Override
// public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Channel incoming = ctx.channel();
// System.out.println("[Client] - " + incoming.remoteAddress() + " 连接过来");
// //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
// }
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, MessageClass.Message msg) throws Exception {
// //System.out.println("收到消息:" + msg.getId());
// }
// });
pipeline.addLast("decoder", MqttEncoder.INSTANCE);
pipeline.addLast("encoder", new MqttDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
System.out.println("mqtt消息:" + mqttMessage.toString());
switch (mqttMessage.fixedHeader().messageType()) {
case PUBLISH:
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
ByteBuf payload = mqttPublishMessage.payload();
byte[] bytes = ByteBufUtil.getBytes(payload);
System.out.println("payload:" + new String(bytes));
break;
default:
break;
}
}
});
}
private void startWebsocketServer(boolean useSsl, Integer port){
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(selectorGroup,ioGroup)
.channel(clazz)
.option(ChannelOption.SO_BACKLOG, nettyConfig.getTcpBackLog())
.childOption(ChannelOption.TCP_NODELAY, nettyConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, nettyConfig.getTcpSndBuf())
.option(ChannelOption.SO_RCVBUF, nettyConfig.getTcpRcvBuf())
.option(ChannelOption.SO_REUSEADDR, nettyConfig.isTcpReuseAddr())
.childOption(ChannelOption.SO_KEEPALIVE, nettyConfig.isTcpKeepAlive())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if (useSsl) {
pipeline.addLast("ssl", NettySslHandler.getSslHandler(
socketChannel,
nettyConfig.isUseClientCA(),
nettyConfig.getSslKeyStoreType(),
brokerConfig.getJmqttHome() + nettyConfig.getSslKeyFilePath(),
nettyConfig.getSslManagerPwd(),
nettyConfig.getSslStorePwd()
));
}
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 60))
.addLast("httpCodec",new HttpServerCodec())
.addLast("aggregator",new HttpObjectAggregator(65535))
.addLast("compressor ", new HttpContentCompressor())
.addLast("webSocketHandler",new WebSocketServerProtocolHandler("/mqtt", MixAll.MQTT_VERSION_SUPPORT,true))
.addLast("byteBuf2WebSocketEncoder",new ByteBuf2WebSocketEncoder())
.addLast("webSocket2ByteBufDecoder",new WebSocket2ByteBufDecoder())
.addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExcutor))
.addLast("nettyMqttHandler", new NettyMqttHandler());
}
});
if(nettyConfig.isPooledByteBufAllocatorEnable()){
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture future = bootstrap.bind(port).sync();
log.info("Start webSocket server {} success,port = {}", useSsl?"with ssl":"", port);
}catch (InterruptedException ex){
log.error("Start webSocket server {} failure.cause={}", useSsl?"with ssl":"", ex);
}
}
private void startTcpServer(boolean useSsl, Integer port){
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(selectorGroup,ioGroup)
.channel(clazz)
.option(ChannelOption.SO_BACKLOG, nettyConfig.getTcpBackLog())
.childOption(ChannelOption.TCP_NODELAY, nettyConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, nettyConfig.getTcpSndBuf())
.option(ChannelOption.SO_RCVBUF, nettyConfig.getTcpRcvBuf())
.option(ChannelOption.SO_REUSEADDR, nettyConfig.isTcpReuseAddr())
.childOption(ChannelOption.SO_KEEPALIVE, nettyConfig.isTcpKeepAlive())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if (useSsl) {
pipeline.addLast("ssl", NettySslHandler.getSslHandler(
socketChannel,
nettyConfig.isUseClientCA(),
nettyConfig.getSslKeyStoreType(),
brokerConfig.getJmqttHome() + nettyConfig.getSslKeyFilePath(),
nettyConfig.getSslManagerPwd(),
nettyConfig.getSslStorePwd()
));
}
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0))
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
.addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExcutor))
.addLast("nettyMqttHandler", new NettyMqttHandler());
}
});
if(nettyConfig.isPooledByteBufAllocatorEnable()){
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture future = bootstrap.bind(port).sync();
log.info("Start tcp server {} success,port = {}", useSsl?"with ssl":"", port);
}catch (InterruptedException ex){
log.error("Start tcp server {} failure.cause={}", useSsl?"with ssl":"", ex);
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
logger.debug("Initializaing channels...");
ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec());
if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) {
ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG));
}
if (useSsl) {
SslContext sslCtx = SslContextBuilder
.forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build();
logger.debug("SSL Provider : {}", SslContext.defaultServerProvider());
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
}
if (useWebSocket) {
String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/");
ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec());
ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576));
ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor());
ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),
new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536)); // [MQTT-6.0.0-3]
ch.pipeline().addLast(new MqttWebSocketCodec());
}
int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092);
ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage));
ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);
ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE);
ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE);
ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE);
ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE);
ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE);
ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE);
}