类 io.netty.handler.codec.mqtt.MqttDecoder 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttDecoder 的API类实例代码及写法,或者点击链接到github查看源代码。


@Override
protected void initChannel(Channel channel) throws Exception {
    //向该pipeline的channel中添加handler
    // 把netty提供的mqtt解码器放入到Channel通道中
    ChannelPipeline pipeline = channel.pipeline();
    //按顺序加入handler
    //1、把MqttEncoder置于handler链的首部,用于处理消息的编码
    pipeline.addLast("encoder",MqttEncoder.INSTANCE);
    //2、把MqttDecoder置于handler链的第二环处理消息的解码
    pipeline.addLast("decoder",new MqttDecoder());
    //3、把netty提供的心跳handler加入到pipeline
    //IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
    //30s没有入站消息,则:关闭此连接。 同时,每隔5秒发送一次ping。
    pipeline.addLast("heartbeatHandler",new IdleStateHandler(20,30,0,TimeUnit.SECONDS));
    //4、把自己的handler加入到管道的末端
    //channel.pipeline().addLast("mqttPingHandler", new MqttPingHandler(5));//定义keepalive时间为5s
    pipeline.addLast("brokerHandler",new MqttBrokerHandler(sessionManager,connectionFactory));
}
 
源代码2 项目: WeEvent   文件: TcpBroker.java

private Channel tcpServer(int port) throws Exception {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(this.bossGroup, this.workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.DEBUG))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline channelPipeline = socketChannel.pipeline();
                    channelPipeline.addFirst("idle", new IdleStateHandler(
                            0,
                            0,
                            weEventConfig.getKeepAlive()));

                    //channelPipeline.addLast("ssl", getSslHandler(sslContext, socketChannel.alloc()));
                    channelPipeline.addLast("decoder", new MqttDecoder());
                    channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    channelPipeline.addLast("broker", new TcpHandler(protocolProcess));
                }
            });
    return serverBootstrap.bind(port).sync().channel();
}
 

@Override
public void addPipeline(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();

    pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyReaderIdleTimeSeconds, 0, 0));
    pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);

    if(autoFlushIdleTime > 0) {
        pipeline.addLast("autoflush", new MqttAutoFlushChannelHandler(autoFlushIdleTime, TimeUnit.SECONDS));
    }
    pipeline.addLast("decoder", new MqttDecoder(messageMaxLength));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    pipeline.addLast("messageLogger",mqttMessageLoggerChannelHandler);

    if(isEnableMetrics()) {
        if(mqttDropWizardMetricsChannelHandler == null) {
            mqttDropWizardMetricsChannelHandler = new MqttDropWizardMetricsChannelHandler();
            mqttDropWizardMetricsChannelHandler.init(metricsLibratoEmail, metricsLibratoToken, metricsLibratoSource);
        }
        pipeline.addLast("wizardMetrics", mqttDropWizardMetricsChannelHandler);
    }

    pipeline.addLast("handler", mqttServerChannelHandler);
}
 

@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();//设置ChannelPipeLine
    SslHandler sslHandler = null;
    //判断SSL处理器处理类是否为空,如果不为空,将SSL处理器加入到ChannelPipeLine
    if (sslHandlerProvider != null) {
        sslHandler = sslHandlerProvider.getSslHandler();
        pipeline.addLast(sslHandler);
    }
    //添加负载内容的解编码器
    pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
            adaptor, sslHandler, quotaService);

    //添加Mqtt协议处理器
    pipeline.addLast(handler);
    //异步操作完成时回调
    ch.closeFuture().addListener(handler);
}
 

@Override
  public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    SslHandler sslHandler = null;
    if (sslHandlerProvider != null) {
      sslHandler = sslHandlerProvider.getSslHandler();
      pipeline.addLast(sslHandler);
    }
    pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    MqttTransportHandler handler = new MqttTransportHandler(msgProducer, deviceService, authService, assetService,
        assetAuthService, relationService, sslHandler);
    pipeline.addLast(handler);

//    ch.closeFuture().addListener(handler);

  }
 
源代码6 项目: blynk-server   文件: MQTTHardwareServer.java

public MQTTHardwareServer(Holder holder) {
    super(holder.props.getProperty("listen.address"),
            holder.props.getIntProperty("hardware.mqtt.port"), holder.transportTypeHolder);

    var hardTimeoutSecs = holder.limits.hardwareIdleTimeout;
    var mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder);
    var alreadyLoggedHandler = new AlreadyLoggedHandler();
    var hardwareChannelStateHandler = new HardwareChannelStateHandler(holder);

    channelInitializer = new ChannelInitializer<>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline()
                .addLast("MqttIdleStateHandler", new IdleStateHandler(hardTimeoutSecs, hardTimeoutSecs, 0))
                .addLast(hardwareChannelStateHandler)
                .addLast(new MqttDecoder())
                .addLast(MqttEncoder.INSTANCE)
                .addLast(mqttHardwareLoginHandler)
                .addLast(alreadyLoggedHandler);
        }
    };

    log.debug("hard.socket.idle.timeout = {}", hardTimeoutSecs);
}
 

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);
    GatewayTransportHandler handler = new GatewayTransportHandler();
    pipeline.addLast(handler);
    socketChannel.closeFuture().addListener(handler);

}
 

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);
    MqttTransportHandler handler = new MqttTransportHandler();
    pipeline.addLast(handler);
    socketChannel.closeFuture().addListener(handler);

}
 
源代码9 项目: cassandana   文件: NewNettyAcceptor.java

private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler,
                                   NewNettyMQTTHandler handler) {
    pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
    pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
    pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
    pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);
    pipeline.addLast("handler", handler);
}
 
源代码10 项目: ext-opensource-netty   文件: MqttServer.java

@Override
protected void initSocketChannel(SocketChannel ch) {
	super.initSocketChannel(ch);
	ch.pipeline().addLast(HANDLER_MQTTDECODER, new MqttDecoder());
	ch.pipeline().addLast(HANDLER_MQTTENCODER, MqttEncoder.INSTANCE);
	ch.pipeline().addLast(HANDLER_MQTTHANDER, new MqttServerHandler(protocolProcess));	
}
 
源代码11 项目: ext-opensource-netty   文件: MqttClient.java

@Override
protected void initSocketChannel(SocketChannel ch) {
	ClientProtocolProcess proObj = new ClientProtocolProcess(this, consumerProcess, producerProcess);
	super.initSocketChannel(ch);

	ch.pipeline().addLast("decoder", new MqttDecoder());
	ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
	ch.pipeline().addLast("mqttHander", new MqttClientHandler(proObj));
}
 
源代码12 项目: iot-mqtt   文件: BrokerService.java

private void startTcpServer(boolean useSsl, Integer port) {
	ServerBootstrap bootstrap = new ServerBootstrap();
	bootstrap.group(selectorGroup, ioGroup).channel(clazz)
			.option(ChannelOption.SO_BACKLOG, mqttConfig.getTcpBackLog())
			.childOption(ChannelOption.TCP_NODELAY, mqttConfig.isTcpNoDelay())
			.childOption(ChannelOption.SO_SNDBUF, mqttConfig.getTcpSndBuf())
			.option(ChannelOption.SO_RCVBUF, mqttConfig.getTcpRcvBuf())
			.option(ChannelOption.SO_REUSEADDR, mqttConfig.isTcpReuseAddr())
			.childOption(ChannelOption.SO_KEEPALIVE, mqttConfig.isTcpKeepAlive())
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel socketChannel) throws Exception {
					ChannelPipeline pipeline = socketChannel.pipeline();
					if (useSsl) {
						pipeline.addLast("ssl",
								NettySslHandler.getSslHandler(socketChannel, mqttConfig.isUseClientCA(),
										mqttConfig.getSslKeyStoreType(), mqttConfig.getSslKeyFilePath(),
										mqttConfig.getSslManagerPwd(), mqttConfig.getSslStorePwd()));
					}
					pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0))
							.addLast("mqttEncoder", MqttEncoder.INSTANCE)
							.addLast("mqttDecoder", new MqttDecoder(mqttConfig.getMaxMsgSize()))
							.addLast("nettyConnectionManager", new NettyConnectHandler(brokerRoom.getNettyEventExcutor()))
							.addLast("nettyMqttHandler", new NettyMqttHandler());
				}
			});
	if (mqttConfig.isPooledByteBufAllocatorEnable()) {
		bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
	}
	try {
		bootstrap.bind(port).sync();
		log.info("[Server] -> start tcp server {} success,port = {}", useSsl ? "with ssl" : "", port);
	} catch (InterruptedException ex) {
		log.error("[Server] -> start tcp server {} failure.cause={}", useSsl ? "with ssl" : "", ex);
	}
}
 

@Override
protected void initChannel(Channel channel) throws Exception {
    channel.pipeline()
            .addLast(new MqttDecoder(mqttContext.getMqttConfig().getMaxPayloadSize()))
            .addLast(MqttEncoder.INSTANCE)
            .addLast(new ConnectionHandler())
            .addLast(newMqttCommandInvocation());
}
 

@Override
protected void initChannel(Channel channel) throws Exception {
    channel.pipeline()
            .addLast(new HttpServerCodec())
            .addLast(new HttpObjectAggregator(65536))
            .addLast(new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST))
            .addLast(new WebSocketFrameToByteBufDecoder())
            .addLast(new ByteBufToWebSocketFrameEncoder())
            .addLast(new MqttDecoder(mqttContext.getMqttConfig().getMaxPayloadSize()))
            .addLast(MqttEncoder.INSTANCE)
            .addLast(new ConnectionHandler())
            .addLast(newMqttCommandInvocation());
}
 
源代码15 项目: netty-learning-example   文件: MqttServer.java

@PostConstruct
public void init() throws Exception {

    log.info("Setting resource leak detector level to {}", leakDetectorLevel);
    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
    log.info("Starting MQTT transport...");

    log.info("Starting MQTT transport server");
    bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
    workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
                    pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    //pipeline.addLast("idleStateHandler", new IdleStateHandler(10,2,12, TimeUnit.SECONDS));
                    MqttTransportHandler handler = new MqttTransportHandler(protocolProcess);
                    pipeline.addLast(handler);
                }
            });

    serverChannel = b.bind(host, port).sync().channel();


    log.info("Mqtt transport started!");
}
 
源代码16 项目: vertx-mqtt   文件: MqttServerImpl.java

private void initChannel(ChannelPipeline pipeline) {

    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }

    // adding the idle state handler for timeout on CONNECT packet
    pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
    pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {

      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
          IdleStateEvent e = (IdleStateEvent) evt;
          if (e.state() == IdleState.READER_IDLE) {
            // as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout)
            // the connection is closed
            ctx.channel().close();
          }
        }
      }
    });
  }
 
源代码17 项目: vertx-mqtt   文件: MqttClientImpl.java

private void initChannel(ChannelPipeline pipeline) {

    // add into pipeline netty's (en/de)coder
    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);

    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }

    if (this.options.isAutoKeepAlive() &&
      this.options.getKeepAliveTimeSeconds() != 0) {

      pipeline.addBefore("handler", "idle",
        new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
      pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

          if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.WRITER_IDLE) {
              ping();
            }
          }
        }
      });
    }
  }
 

@Override
public void addChannelHandlers(ChannelPipeline pipeline) {
   pipeline.addLast(MqttEncoder.INSTANCE);
   pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));

   pipeline.addLast(new MQTTProtocolHandler(server, this));
}
 
源代码19 项目: getty   文件: GimClientInitializer.java

@Override
    public void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();


        // ----配置Protobuf处理器----
        //用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
        //pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //
        //配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
        //pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));

        // 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
        //pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        // 配置Protobuf编码器,发送的消息会先经过编码
        //pipeline.addLast(new ProtobufEncoder());
        // ----Protobuf处理器END----


//        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
//        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
//        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
//        pipeline.addLast(new SimpleChannelInboundHandler<String>() {
//
//            @Override
//            public void handlerAdded(ChannelHandlerContext ctx)
//                    throws Exception {
//                Channel incoming = ctx.channel();
//                System.out.println("[Client] - " + incoming.remoteAddress()
//                        + " 连接过来");
//
//                //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
//            }
//
//            @Override
//            protected void channelRead0(ChannelHandlerContext ctx, String msg)
//                    throws Exception {
//
//                //System.out.println("收到消息:" + msg);
//
//            }
//
//        });


        pipeline.addLast("decoder", MqttEncoder.INSTANCE);
        pipeline.addLast("encoder", new MqttDecoder());
        pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>(){

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
                System.out.println(mqttMessage.toString());
            }
        });

    }
 
源代码20 项目: getty   文件: GimServerInitializer.java

@Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();


        // ----配置Protobuf处理器----
        // 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
        // pipeline.addLast(new ProtobufVarint32FrameDecoder());

        //配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
        // pipeline.addLast(new ProtobufDecoder(MessageClass.Message.getDefaultInstance()));

        // 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
        // pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        // 配置Protobuf编码器,发送的消息会先经过编码
        // pipeline.addLast(new ProtobufEncoder());
        // ----Protobuf处理器END----


//        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
//        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
//        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));


//        pipeline.addLast(new SimpleChannelInboundHandler<MessageClass.Message>() {
//            @Override
//            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//                Channel incoming = ctx.channel();
//                System.out.println("[Client] - " + incoming.remoteAddress() + " 连接过来");
//                //incoming.writeAndFlush("123\r\n456\r789\nabcde\r\n");
//            }
//            @Override
//            protected void channelRead0(ChannelHandlerContext ctx, MessageClass.Message msg) throws Exception {
//                //System.out.println("收到消息:" + msg.getId());
//            }
//        });


        pipeline.addLast("decoder", MqttEncoder.INSTANCE);
        pipeline.addLast("encoder", new MqttDecoder());
        pipeline.addLast(new SimpleChannelInboundHandler<MqttMessage>() {

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, MqttMessage mqttMessage) throws Exception {
                System.out.println("mqtt消息:" + mqttMessage.toString());

                switch (mqttMessage.fixedHeader().messageType()) {
                    case PUBLISH:
                        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
                        ByteBuf payload = mqttPublishMessage.payload();
                        byte[] bytes = ByteBufUtil.getBytes(payload);
                        System.out.println("payload:" + new String(bytes));
                        break;
                    default:
                        break;
                }

            }
        });

    }
 
源代码21 项目: jmqtt   文件: NettyRemotingServer.java

private void startWebsocketServer(boolean useSsl, Integer port){
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(selectorGroup,ioGroup)
            .channel(clazz)
            .option(ChannelOption.SO_BACKLOG, nettyConfig.getTcpBackLog())
            .childOption(ChannelOption.TCP_NODELAY, nettyConfig.isTcpNoDelay())
            .childOption(ChannelOption.SO_SNDBUF, nettyConfig.getTcpSndBuf())
            .option(ChannelOption.SO_RCVBUF, nettyConfig.getTcpRcvBuf())
            .option(ChannelOption.SO_REUSEADDR, nettyConfig.isTcpReuseAddr())
            .childOption(ChannelOption.SO_KEEPALIVE, nettyConfig.isTcpKeepAlive())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    if (useSsl) {
                        pipeline.addLast("ssl", NettySslHandler.getSslHandler(
                                socketChannel,
                                nettyConfig.isUseClientCA(),
                                nettyConfig.getSslKeyStoreType(),
                                brokerConfig.getJmqttHome() + nettyConfig.getSslKeyFilePath(),
                                nettyConfig.getSslManagerPwd(),
                                nettyConfig.getSslStorePwd()
                        ));
                    }
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 60))
                            .addLast("httpCodec",new HttpServerCodec())
                            .addLast("aggregator",new HttpObjectAggregator(65535))
                            .addLast("compressor ", new HttpContentCompressor())
                            .addLast("webSocketHandler",new WebSocketServerProtocolHandler("/mqtt", MixAll.MQTT_VERSION_SUPPORT,true))
                            .addLast("byteBuf2WebSocketEncoder",new ByteBuf2WebSocketEncoder())
                            .addLast("webSocket2ByteBufDecoder",new WebSocket2ByteBufDecoder())
                            .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
                            .addLast("mqttEncoder", MqttEncoder.INSTANCE)
                            .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExcutor))
                            .addLast("nettyMqttHandler", new NettyMqttHandler());
                }
            });
    if(nettyConfig.isPooledByteBufAllocatorEnable()){
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
    try {
        ChannelFuture future = bootstrap.bind(port).sync();
        log.info("Start webSocket server {}  success,port = {}", useSsl?"with ssl":"", port);
    }catch (InterruptedException ex){
        log.error("Start webSocket server {} failure.cause={}", useSsl?"with ssl":"", ex);
    }
}
 
源代码22 项目: jmqtt   文件: NettyRemotingServer.java

private void startTcpServer(boolean useSsl, Integer port){
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(selectorGroup,ioGroup)
            .channel(clazz)
            .option(ChannelOption.SO_BACKLOG, nettyConfig.getTcpBackLog())
            .childOption(ChannelOption.TCP_NODELAY, nettyConfig.isTcpNoDelay())
            .childOption(ChannelOption.SO_SNDBUF, nettyConfig.getTcpSndBuf())
            .option(ChannelOption.SO_RCVBUF, nettyConfig.getTcpRcvBuf())
            .option(ChannelOption.SO_REUSEADDR, nettyConfig.isTcpReuseAddr())
            .childOption(ChannelOption.SO_KEEPALIVE, nettyConfig.isTcpKeepAlive())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    if (useSsl) {
                        pipeline.addLast("ssl", NettySslHandler.getSslHandler(
                                socketChannel,
                                nettyConfig.isUseClientCA(),
                                nettyConfig.getSslKeyStoreType(),
                                brokerConfig.getJmqttHome() + nettyConfig.getSslKeyFilePath(),
                                nettyConfig.getSslManagerPwd(),
                                nettyConfig.getSslStorePwd()
                        ));
                    }
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0))
                            .addLast("mqttEncoder", MqttEncoder.INSTANCE)
                            .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
                            .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExcutor))
                            .addLast("nettyMqttHandler", new NettyMqttHandler());
                }
            });
    if(nettyConfig.isPooledByteBufAllocatorEnable()){
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
    try {
        ChannelFuture future = bootstrap.bind(port).sync();
        log.info("Start tcp server {} success,port = {}", useSsl?"with ssl":"", port);
    }catch (InterruptedException ex){
        log.error("Start tcp server {} failure.cause={}", useSsl?"with ssl":"", ex);
    }
}
 
源代码23 项目: lannister   文件: MqttChannelInitializer.java

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	logger.debug("Initializaing channels...");

	ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec());

	if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) {
		ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG));
	}

	if (useSsl) {
		SslContext sslCtx = SslContextBuilder
				.forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build();

		logger.debug("SSL Provider : {}", SslContext.defaultServerProvider());

		ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
	}

	if (useWebSocket) {
		String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/");

		ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec());
		ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576));
		ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor());
		ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),
				new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536)); // [MQTT-6.0.0-3]
		ch.pipeline().addLast(new MqttWebSocketCodec());
	}

	int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092);

	ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage));
	ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);

	ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE);
	ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE);
	ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE);
	ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE);
	ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE);
	ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE);
}
 
 类所在包
 类方法
 同包方法