类 io.netty.handler.codec.mqtt.MqttConnectMessage 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttConnectMessage 的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: cassandana   文件: SessionRegistry.java

private Session createNewSession(MQTTConnection mqttConnection, MqttConnectMessage msg, String clientId) {
    final boolean clean = msg.variableHeader().isCleanSession();
    final Queue<SessionRegistry.EnqueuedMessage> sessionQueue =
                queues.computeIfAbsent(clientId, (String cli) -> queueRepository.createQueue(cli, clean));
    final Session newSession;
    if (msg.variableHeader().isWillFlag()) {
        final Session.Will will = createWill(msg);
        newSession = new Session(clientId, clean, will, sessionQueue);
    } else {
        newSession = new Session(clean, clientId, sessionQueue);
    }

    newSession.markConnected();
    newSession.bind(mqttConnection);

    return newSession;
}
 

private MqttSession createNewSession(MqttConnection mqttConnection, MqttConnectMessage msg, String clientId) {
    final boolean clean = msg.variableHeader().isCleanSession();
    final Queue<EnqueuedMessage> sessionQueue =
                queues.computeIfAbsent(clientId, (String cli) -> queueRepository.createQueue(cli, clean));
    final MqttSession newSession;
    if (msg.variableHeader().isWillFlag()) {
        final MqttSession.Will will = createWill(msg);
        newSession = new MqttSession(clientId, clean, will, sessionQueue);
    } else {
        newSession = new MqttSession(clean, clientId, sessionQueue);
    }

    newSession.markConnected();
    newSession.bind(mqttConnection);

    return newSession;
}
 

private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
  String userName = msg.payload().userName();
  String clientIdentifier = msg.payload().clientIdentifier();
  if (StringUtils.isEmpty(userName)) {
    // ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
    // ctx.close();
    ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
    connected = false;
  } else {
    boolean login = deviceSessionCtx.login(new DeviceTokenCredentials(userName));
    if (!login) {
      ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
      connected = false;
    } else {
      MemoryMetaPool.registerClienId(clientIdentifier, ctx.channel());

      ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
      connected = true;
      checkGatewaySession();
    }
    // }
  }

}
 
源代码4 项目: lannister   文件: MqttMessageFactory.java

public static MqttConnectMessage connect(ConnectOptions options) {
	MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
			10);
	MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
			options.version().protocolLevel(), options.userName() != null, options.password() != null,
			options.will() == null ? false : options.will().isRetain(),
			options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
			options.cleanSession(), options.keepAliveTimeSeconds());

	MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
			options.will() == null ? "" : options.will().topicName(),
			options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
			Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));

	return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
 
源代码5 项目: lannister   文件: ConnectReceiverTest.java

private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
		throws Exception {
	MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
			10);
	MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
			cleanSession, 60);
	MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
			"password");

	MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);

	ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;

	EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());

	channel.writeInbound(msg);

	return channel.readOutbound();
}
 
源代码6 项目: WeEvent   文件: ProtocolProcess.java

public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
    if (connectMessage.variableHeader().isWillFlag()) {
        log.info("get will message from client");

        MqttMessage msg = MqttMessageFactory.newMessage(
                new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
                new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));

        return (MqttPublishMessage) msg;
    }

    return null;
}
 
源代码7 项目: WeEvent   文件: Connect.java

public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);

    String clientId = sessionData.getClientId();
    if (StringUtils.isBlank(clientId)) {
        log.error("clientId is empty, reject");
        return MqttMessageFactory.newMessage(fixedHeader,
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
    }

    // verify userName and password
    String username = msg.payload().userName();
    String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
    if (!this.authService.verifyUserName(username, password)) {
        log.error("verify account failed, reject");
        return MqttMessageFactory.newMessage(fixedHeader,
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
    }

    if (this.sessionStore.existSession(clientId)) {
        log.info("exist client id, force to delete the older");
        this.sessionStore.removeSession(clientId);
    }

    // store new session
    this.sessionStore.addSession(clientId, sessionData);

    log.info("MQTT connected, clientId: {}", clientId);
    return MqttMessageFactory.newMessage(fixedHeader,
            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
 
源代码8 项目: cassandana   文件: BrokerInterceptor.java

@Override
public void notifyClientConnected(final MqttConnectMessage msg) {
    for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
        LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
                msg.payload().clientIdentifier(), handler.getID());
        executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
    }
}
 
源代码9 项目: cassandana   文件: SessionRegistry.java

private void copySessionConfig(MqttConnectMessage msg, Session session) {
    final boolean clean = msg.variableHeader().isCleanSession();
    final Session.Will will;
    if (msg.variableHeader().isWillFlag()) {
        will = createWill(msg);
    } else {
        will = null;
    }
    session.update(clean, will);
}
 
源代码10 项目: cassandana   文件: SessionRegistry.java

private Session.Will createWill(MqttConnectMessage msg) {
    final ByteBuf willPayload = Unpooled.copiedBuffer(msg.payload().willMessageInBytes());
    final String willTopic = msg.payload().willTopic();
    final boolean retained = msg.variableHeader().isWillRetain();
    final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos());
    return new Session.Will(willTopic, willPayload, qos, retained);
}
 

public static MqttConnectMessage connectMessage(MqttConnectOptions info) {
	MqttVersion verinfo = info.getMqttVersion();
	MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE,
			false, 10);
	MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(verinfo.protocolName(),
			verinfo.protocolLevel(), info.isHasUserName(), info.isHasPassword(), info.isHasWillRetain(),
			info.getWillQos(), info.isHasWillFlag(), info.isHasCleanSession(), info.getKeepAliveTime());
	MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(info.getClientIdentifier(), info.getWillTopic(),
			info.getWillMessage(), info.getUserName(), info.getPassword());
	MqttConnectMessage mqttSubscribeMessage = new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader,
			mqttConnectPayload);
	return mqttSubscribeMessage;
}
 
源代码12 项目: joyqueue   文件: MqttProtocolHandler.java

private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {

        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
        MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                mqttFixedHeader,
                new MqttConnAckVariableHeader(ackCode, sessionPresent),
                null);
        client.writeAndFlush(connAckMessage);
        return connAckMessage;
    }
 
源代码13 项目: joyqueue   文件: MqttProtocolHandler.java

private MqttConnectReturnCode checkAuth(MqttConnectMessage message) {

        boolean cleanSession = message.variableHeader().isCleanSession();
        String clientID = message.payload().clientIdentifier();
        boolean noId = Strings.isNullOrEmpty(clientID);

        if (noId) {
            LOG.debug("NULL clientID, cleanSession: {}", cleanSession);
            return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
        }

        if (LOG.isDebugEnabled()){
            LOG.debug("hasUserName: {}", message.variableHeader().hasUserName());
            LOG.debug("hasPassword: {}", message.variableHeader().hasPassword());
        }
        if (message.variableHeader().hasUserName() && message.variableHeader().hasPassword()) {
            String userName = message.payload().userName();
            String passWord = message.payload().password();
            if (LOG.isDebugEnabled()){
                LOG.debug("CONN username: {}, password: {}", userName, passWord);
            }
            if (auth(userName, passWord)) {
                return MqttConnectReturnCode.CONNECTION_ACCEPTED;
            } else {
                return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
            }
        }
        return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
    }
 
源代码14 项目: joyqueue   文件: MqttProtocolHandler.java

private void initializeKeepAliveTimeout(Channel client, MqttConnectMessage msg, String clientId) {
    int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
    NettyAttrManager.setAttrKeepAlive(client, keepAlive);
    NettyAttrManager.setAttrClientId(client, clientId);
    NettyAttrManager.setAttrCleanSession(client, msg.variableHeader().isCleanSession());
    int idleTime = Math.round(keepAlive * 1.5f);
    if (client.pipeline().names().contains("idleStateHandler")) {
        client.pipeline().remove("idleStateHandler");
    }
    client.pipeline().addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
}
 
源代码15 项目: joyqueue   文件: MqttProtocolHandler.java

private void storeWillMessage(String clientID, MqttConnectMessage msg) {
    if (msg.variableHeader().isWillFlag()) {
        MqttQoS willQos = MqttQoS.valueOf(msg.variableHeader().willQos());
        byte[] willPayload = msg.payload().willMessageInBytes();
        ByteBuffer bb = (ByteBuffer) ByteBuffer.allocate(willPayload.length).put(willPayload).flip();
        WillMessage will = new WillMessage(msg.payload().willTopic(), bb, msg.variableHeader().isWillRetain(), willQos);
        willStore.put(clientID, will);
        LOG.info("Latest will message stored for client: <{}>", clientID);
    }
}
 
源代码16 项目: joyqueue   文件: MqttProtocolHandler.java

private void addConnection(final Channel client, MqttConnectMessage connectMessage, String clientID) {
    String userName = "";
    String passWord = "";
    if (connectMessage.variableHeader().hasUserName() && connectMessage.variableHeader().hasPassword()) {
        userName = connectMessage.payload().userName();
        passWord = connectMessage.payload().password();
    }
    MqttConnection connection = new MqttConnection(
            clientID,
            userName,
            passWord,
            connectMessage.variableHeader().isCleanSession(),
            connectMessage.variableHeader().version(),
            connectMessage.variableHeader().isWillRetain(),
            connectMessage.variableHeader().willQos(),
            connectMessage.variableHeader().isWillFlag(),
            connectMessage.variableHeader().keepAliveTimeSeconds(),
            client);
    connection.setAddress(IpUtil.toByte((InetSocketAddress) client.remoteAddress()));
    connection.setServerAddress(IpUtil.toByte((InetSocketAddress) client.localAddress()));
    final MqttConnection existing = connectionManager.addConnection(connection);
    if (existing != null) {
        //ClientId重复
        LOG.warn("重复clientID的connection连接: <{}>, 需要断开或者重置. 新建的client连接: <{}>", existing, connection);
        existing.getChannel().close().addListener(CLOSE_ON_FAILURE);
        connectionManager.removeConnection(existing);
        connectionManager.addConnection(connection);
    }
}
 

@Override
public void notifyClientConnected(final MqttConnectMessage msg) {
    for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
        LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
                msg.payload().clientIdentifier(), handler.getID());
        executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
    }
}
 

private void copySessionConfig(MqttConnectMessage msg, MqttSession session) {
    final boolean clean = msg.variableHeader().isCleanSession();
    final MqttSession.Will will;
    if (msg.variableHeader().isWillFlag()) {
        will = createWill(msg);
    } else {
        will = null;
    }
    session.update(clean, will);
}
 

private MqttSession.Will createWill(MqttConnectMessage msg) {
    final ByteBuf willPayload = Unpooled.copiedBuffer(msg.payload().willMessageInBytes());
    final String willTopic = msg.payload().willTopic();
    final boolean retained = msg.variableHeader().isWillRetain();
    final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos());
    return new MqttSession.Will(willTopic, willPayload, qos, retained);
}
 
源代码20 项目: iotplatform   文件: MqttTransportHandler.java

private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
  // deviceSessionCtx.setChannel(ctx);
  // assetSessionCtx.setChannel(ctx);

  switch (msg.fixedHeader().messageType()) {
  case CONNECT:
    processConnect(ctx, (MqttConnectMessage) msg);
    break;
  case PUBLISH:
    processPublish(ctx, (MqttPublishMessage) msg);
    // System.out.println("write...");
    // ctx.write("just for test");
    break;
  case SUBSCRIBE:
    processSubscribe(ctx, (MqttSubscribeMessage) msg);
    break;
  case UNSUBSCRIBE:
    processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
    break;
  case PINGREQ:
    if (checkConnected(ctx)) {
      ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
    }
    break;
  case DISCONNECT:
    if (checkConnected(ctx)) {
      processDisconnect(ctx);
    }
    break;
  }
}
 
源代码21 项目: lannister   文件: ConnectReceiver.java

private Message newWill(String clientId, MqttConnectMessage conn) {
	if (!conn.variableHeader().isWillFlag()) { return null; } // [MQTT-3.1.2-12]

	return new Message(-1, conn.payload().willTopic(), clientId,
			conn.payload().willMessage().getBytes(CharsetUtil.UTF_8),
			MqttQoS.valueOf(conn.variableHeader().willQos()), conn.variableHeader().isWillRetain());
}
 
源代码22 项目: lannister   文件: ConnectReceiver.java

private boolean filterPlugins(ChannelHandlerContext ctx, MqttConnectMessage msg) {
	String clientId = msg.payload().clientIdentifier();
	String userName = msg.variableHeader().hasUserName() ? msg.payload().userName() : null;
	String password = msg.variableHeader().hasPassword() ? msg.payload().password() : null;

	if (!Plugins.INSTANCE.get(ServiceChecker.class).isServiceAvailable()) {
		sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
		return false;
	}

	if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId)) {
		sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-9]
		return false;
	}

	if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId, userName, password)) {
		sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
		return false;
	}

	if (!Plugins.INSTANCE.get(Authorizer.class).isAuthorized(clientId, userName)) {
		sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
		return false;
	}

	return true;
}
 

@Override
public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
   System.out.println("MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());

   // If you need to handle an specific packet type:
   if (mqttMessage instanceof MqttPublishMessage) {
      MqttPublishMessage message = (MqttPublishMessage) mqttMessage;


      String originalMessage = message.payload().toString(Charset.forName("UTF-8"));
      System.out.println("Original message: " + originalMessage);

      // The new message content must not be bigger that the original content.
      String modifiedMessage = "Modified message ";

      message.payload().setBytes(0, modifiedMessage.getBytes());
   } else {
      if (mqttMessage instanceof MqttConnectMessage) {
         MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
         System.out.println("MQTT CONNECT control packet was intercepted " + connectMessage);
      }
   }


   // We return true which means "call next interceptor" (if there is one) or target.
   // If we returned false, it means "abort call" - no more interceptors would be called and neither would
   // the target
   return true;
}
 

@Test(timeout = 60000)
public void testRejectedMqttConnectMessage() throws Exception {
   CountDownLatch publishThreadReady = new CountDownLatch(1);

   server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) (packet, connection) -> {
      if (packet.getClass() == MqttConnectMessage.class) {
         return false;
      } else {
         return true;
      }
   });

   Thread publishThread = new Thread(() -> {
      MQTTClientProvider publishProvider = getMQTTClientProvider();

      publishThreadReady.countDown();

      try {
         initializeConnection(publishProvider);
         publishProvider.disconnect();
         fail("The connection should be rejected!");
      } catch (Exception ignore) {
      }
   });

   publishThread.start();

   publishThreadReady.await();

   publishThread.join(3000);

   if (publishThread.isAlive()) {
      fail("The connection is stuck!");
   }
}
 

/**
 * Called during connection.
 *
 * @param connect
 */
void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception {
   this.ctx = ctx;
   connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;

   String clientId = connect.payload().clientIdentifier();
   session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
}
 

public InterceptConnectMessage(MqttConnectMessage msg) {
    super(msg);
    this.msg = msg;
}
 

/**
 * processConnect
 * @param channel
 * @param msg
 */
public void processConnect(Channel channel, MqttConnectMessage msg) {
	// 消息解码器出现异常
	if (msg.decoderResult().isFailure()) {
		Throwable cause = msg.decoderResult().cause();
		writeBackConnect(channel, ProtocolUtil.connectReturnCodeForException(cause), false, true);
		return;
	}

	String deviceid = msg.payload().clientIdentifier();
	// clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
	if (deviceid == null || deviceid.trim().length() == 0) {
		writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false, true);
		return;
	}

	// 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
	String username = msg.payload().userName();
	String password = msg.payload().passwordInBytes() == null ? null
			: new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
	if (!authService.checkValid(deviceid, username, password)) {
		writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false, true);
		return;
	}

	boolean isCleanSession = msg.variableHeader().isCleanSession();

	// 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
	if (sessionService.containsKey(deviceid)) {
		if (isCleanSession) {
			sessionService.remove(deviceid);
			topicProcess.removeByCleanSession(deviceid);
			procedureProcess.removeByCleanSession(deviceid);
			consumerProcess.removeByCleanSession(deviceid);
		}
		sessionService.closeSession(deviceid);
	}

	// 处理遗嘱信息
	MqttSession sessionStore = new MqttSession(deviceid, channel, isCleanSession, null);
	if (msg.variableHeader().isWillFlag()) {
		MqttPublishMessage willMessage = ProtocolUtil.publishMessage(msg.payload().willTopic(), false,
				msg.variableHeader().willQos(), msg.variableHeader().isWillRetain(), 0,
				msg.payload().willMessageInBytes());
		sessionStore.setWillMessage(willMessage);
	}
	// 处理连接心跳包
	int idelTimes = msg.variableHeader().keepAliveTimeSeconds();
	if (idelTimes <= 0) {
		idelTimes = 60;
	}
	
	if (idelTimes> 0) {
		String idelStr = NettyConstant.HANDLER_NAME_HEARTCHECK;
		if (channel.pipeline().names().contains(idelStr)) {
			channel.pipeline().remove(idelStr);
		}
		channel.pipeline().addFirst(idelStr,
				new IdleStateHandler(0, 0, Math.round(idelTimes * 1.5f)));
	}

	// 至此存储会话信息及返回接受客户端连接
	sessionService.put(deviceid, sessionStore);
	channel.attr(NettyConstant.CLIENTID_KEY).set(deviceid);

	Boolean sessionPresent = sessionService.containsKey(deviceid) && !isCleanSession;
	writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent, false);

	NettyLog.debug("CONNECT - clientId: {}, cleanSession: {}", deviceid, isCleanSession);

	// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
	if (!isCleanSession) {
		this.consumerProcess.processHistoryPub(channel);
		this.procedureProcess.processHistoryPubRel(channel);
	}
}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
	
	if (!msg.decoderResult().isSuccess()) {
		NettyLog.error("error decoder");
		ctx.close(); 
		return;
	}
	
	NettyLog.debug("read: " + msg.fixedHeader().messageType());
	
	if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
		protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
	} else {
		if (!NettyUtil.isLogin(ctx.channel())) {
			NettyLog.info("not login");
			return ;
		}
	}

	switch (msg.fixedHeader().messageType()) {
	case CONNECT:
		break;
	case CONNACK:
		break;
	case PUBLISH:
		protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
		break;
	case PUBACK:
		protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBREC:
		protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBREL:
		protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBCOMP:
		protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case SUBSCRIBE:
		protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg); 
		break;
	case SUBACK:
		break;
	case UNSUBSCRIBE:
		protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
		break;
	case UNSUBACK:
		break;
	case PINGREQ:
		protocolProcess.processPingReq(ctx.channel(), msg);
		break;
	case PINGRESP:
		break;
	case DISCONNECT:
		protocolProcess.processDisConnect(ctx.channel(), msg);
		break;
	default:
		break;
	}
}
 
源代码29 项目: joyqueue   文件: MqttProtocolHandler.java

public void processConnect(Channel client, MqttConnectMessage connectMessage) {
    String clientId = connectMessage.payload().clientIdentifier();
    boolean isCleanSession = connectMessage.variableHeader().isCleanSession();

    //验证版本
    if (!connectMessage.variableHeader().name().equals("MQTT") ||
            connectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientId, connectMessage.toString());
        }
        sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
        return;
    }

    MqttConnectReturnCode resultCode = checkAuth(connectMessage);
    if (!(resultCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) ||
            Strings.isNullOrEmpty(clientId)) {
        sendAckToClient(client, connectMessage, resultCode, false);
        return;
    }
    addConnection(client, connectMessage, clientId);

    //处理心跳包时间,把心跳包时长和一些其他属性都添加到会话中,方便以后使用
    initializeKeepAliveTimeout(client, connectMessage, clientId);
    storeWillMessage(clientId, connectMessage);

    sessionManager.addSession(clientId, isCleanSession);

    MqttConnAckMessage okResp = sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);

    if (okResp.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
        LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>",
                clientId,
                okResp.variableHeader().connectReturnCode().byteValue(),
                client.remoteAddress(),
                client.localAddress()
        );
    }

    consumerManager.fireConsume(clientId);

    LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", clientId, client.remoteAddress(), client.localAddress());
}
 
源代码30 项目: joyqueue   文件: ConnectHandler.java

@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
    MqttConnectMessage connectMessage = (MqttConnectMessage) message;

    mqttProtocolHandler.processConnect(client, connectMessage);
}
 
 类所在包
 同包方法