下面列出了io.netty.channel.ChannelId#io.netty.handler.codec.mqtt.MqttConnectMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Session createNewSession(MQTTConnection mqttConnection, MqttConnectMessage msg, String clientId) {
final boolean clean = msg.variableHeader().isCleanSession();
final Queue<SessionRegistry.EnqueuedMessage> sessionQueue =
queues.computeIfAbsent(clientId, (String cli) -> queueRepository.createQueue(cli, clean));
final Session newSession;
if (msg.variableHeader().isWillFlag()) {
final Session.Will will = createWill(msg);
newSession = new Session(clientId, clean, will, sessionQueue);
} else {
newSession = new Session(clean, clientId, sessionQueue);
}
newSession.markConnected();
newSession.bind(mqttConnection);
return newSession;
}
private MqttSession createNewSession(MqttConnection mqttConnection, MqttConnectMessage msg, String clientId) {
final boolean clean = msg.variableHeader().isCleanSession();
final Queue<EnqueuedMessage> sessionQueue =
queues.computeIfAbsent(clientId, (String cli) -> queueRepository.createQueue(cli, clean));
final MqttSession newSession;
if (msg.variableHeader().isWillFlag()) {
final MqttSession.Will will = createWill(msg);
newSession = new MqttSession(clientId, clean, will, sessionQueue);
} else {
newSession = new MqttSession(clean, clientId, sessionQueue);
}
newSession.markConnected();
newSession.bind(mqttConnection);
return newSession;
}
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
String clientIdentifier = msg.payload().clientIdentifier();
if (StringUtils.isEmpty(userName)) {
// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
// ctx.close();
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
connected = false;
} else {
boolean login = deviceSessionCtx.login(new DeviceTokenCredentials(userName));
if (!login) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
connected = false;
} else {
MemoryMetaPool.registerClienId(clientIdentifier, ctx.channel());
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
}
// }
}
}
public static MqttConnectMessage connect(ConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
options.version().protocolLevel(), options.userName() != null, options.password() != null,
options.will() == null ? false : options.will().isRetain(),
options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
options.cleanSession(), options.keepAliveTimeSeconds());
MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
options.will() == null ? "" : options.will().topicName(),
options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
throws Exception {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
cleanSession, 60);
MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
"password");
MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);
ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;
EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());
channel.writeInbound(msg);
return channel.readOutbound();
}
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
String clientId = sessionData.getClientId();
if (StringUtils.isBlank(clientId)) {
log.error("clientId is empty, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
}
// verify userName and password
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
if (!this.authService.verifyUserName(username, password)) {
log.error("verify account failed, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
}
if (this.sessionStore.existSession(clientId)) {
log.info("exist client id, force to delete the older");
this.sessionStore.removeSession(clientId);
}
// store new session
this.sessionStore.addSession(clientId, sessionData);
log.info("MQTT connected, clientId: {}", clientId);
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
@Override
public void notifyClientConnected(final MqttConnectMessage msg) {
for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
msg.payload().clientIdentifier(), handler.getID());
executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
}
}
private void copySessionConfig(MqttConnectMessage msg, Session session) {
final boolean clean = msg.variableHeader().isCleanSession();
final Session.Will will;
if (msg.variableHeader().isWillFlag()) {
will = createWill(msg);
} else {
will = null;
}
session.update(clean, will);
}
private Session.Will createWill(MqttConnectMessage msg) {
final ByteBuf willPayload = Unpooled.copiedBuffer(msg.payload().willMessageInBytes());
final String willTopic = msg.payload().willTopic();
final boolean retained = msg.variableHeader().isWillRetain();
final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos());
return new Session.Will(willTopic, willPayload, qos, retained);
}
public static MqttConnectMessage connectMessage(MqttConnectOptions info) {
MqttVersion verinfo = info.getMqttVersion();
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE,
false, 10);
MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(verinfo.protocolName(),
verinfo.protocolLevel(), info.isHasUserName(), info.isHasPassword(), info.isHasWillRetain(),
info.getWillQos(), info.isHasWillFlag(), info.isHasCleanSession(), info.getKeepAliveTime());
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(info.getClientIdentifier(), info.getWillTopic(),
info.getWillMessage(), info.getUserName(), info.getPassword());
MqttConnectMessage mqttSubscribeMessage = new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader,
mqttConnectPayload);
return mqttSubscribeMessage;
}
private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttConnAckVariableHeader(ackCode, sessionPresent),
null);
client.writeAndFlush(connAckMessage);
return connAckMessage;
}
private MqttConnectReturnCode checkAuth(MqttConnectMessage message) {
boolean cleanSession = message.variableHeader().isCleanSession();
String clientID = message.payload().clientIdentifier();
boolean noId = Strings.isNullOrEmpty(clientID);
if (noId) {
LOG.debug("NULL clientID, cleanSession: {}", cleanSession);
return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
}
if (LOG.isDebugEnabled()){
LOG.debug("hasUserName: {}", message.variableHeader().hasUserName());
LOG.debug("hasPassword: {}", message.variableHeader().hasPassword());
}
if (message.variableHeader().hasUserName() && message.variableHeader().hasPassword()) {
String userName = message.payload().userName();
String passWord = message.payload().password();
if (LOG.isDebugEnabled()){
LOG.debug("CONN username: {}, password: {}", userName, passWord);
}
if (auth(userName, passWord)) {
return MqttConnectReturnCode.CONNECTION_ACCEPTED;
} else {
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
}
}
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
private void initializeKeepAliveTimeout(Channel client, MqttConnectMessage msg, String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
NettyAttrManager.setAttrKeepAlive(client, keepAlive);
NettyAttrManager.setAttrClientId(client, clientId);
NettyAttrManager.setAttrCleanSession(client, msg.variableHeader().isCleanSession());
int idleTime = Math.round(keepAlive * 1.5f);
if (client.pipeline().names().contains("idleStateHandler")) {
client.pipeline().remove("idleStateHandler");
}
client.pipeline().addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
}
private void storeWillMessage(String clientID, MqttConnectMessage msg) {
if (msg.variableHeader().isWillFlag()) {
MqttQoS willQos = MqttQoS.valueOf(msg.variableHeader().willQos());
byte[] willPayload = msg.payload().willMessageInBytes();
ByteBuffer bb = (ByteBuffer) ByteBuffer.allocate(willPayload.length).put(willPayload).flip();
WillMessage will = new WillMessage(msg.payload().willTopic(), bb, msg.variableHeader().isWillRetain(), willQos);
willStore.put(clientID, will);
LOG.info("Latest will message stored for client: <{}>", clientID);
}
}
private void addConnection(final Channel client, MqttConnectMessage connectMessage, String clientID) {
String userName = "";
String passWord = "";
if (connectMessage.variableHeader().hasUserName() && connectMessage.variableHeader().hasPassword()) {
userName = connectMessage.payload().userName();
passWord = connectMessage.payload().password();
}
MqttConnection connection = new MqttConnection(
clientID,
userName,
passWord,
connectMessage.variableHeader().isCleanSession(),
connectMessage.variableHeader().version(),
connectMessage.variableHeader().isWillRetain(),
connectMessage.variableHeader().willQos(),
connectMessage.variableHeader().isWillFlag(),
connectMessage.variableHeader().keepAliveTimeSeconds(),
client);
connection.setAddress(IpUtil.toByte((InetSocketAddress) client.remoteAddress()));
connection.setServerAddress(IpUtil.toByte((InetSocketAddress) client.localAddress()));
final MqttConnection existing = connectionManager.addConnection(connection);
if (existing != null) {
//ClientId重复
LOG.warn("重复clientID的connection连接: <{}>, 需要断开或者重置. 新建的client连接: <{}>", existing, connection);
existing.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(existing);
connectionManager.addConnection(connection);
}
}
@Override
public void notifyClientConnected(final MqttConnectMessage msg) {
for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
msg.payload().clientIdentifier(), handler.getID());
executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
}
}
private void copySessionConfig(MqttConnectMessage msg, MqttSession session) {
final boolean clean = msg.variableHeader().isCleanSession();
final MqttSession.Will will;
if (msg.variableHeader().isWillFlag()) {
will = createWill(msg);
} else {
will = null;
}
session.update(clean, will);
}
private MqttSession.Will createWill(MqttConnectMessage msg) {
final ByteBuf willPayload = Unpooled.copiedBuffer(msg.payload().willMessageInBytes());
final String willTopic = msg.payload().willTopic();
final boolean retained = msg.variableHeader().isWillRetain();
final MqttQoS qos = MqttQoS.valueOf(msg.variableHeader().willQos());
return new MqttSession.Will(willTopic, willPayload, qos, retained);
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
// deviceSessionCtx.setChannel(ctx);
// assetSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
// System.out.println("write...");
// ctx.write("just for test");
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
}
}
private Message newWill(String clientId, MqttConnectMessage conn) {
if (!conn.variableHeader().isWillFlag()) { return null; } // [MQTT-3.1.2-12]
return new Message(-1, conn.payload().willTopic(), clientId,
conn.payload().willMessage().getBytes(CharsetUtil.UTF_8),
MqttQoS.valueOf(conn.variableHeader().willQos()), conn.variableHeader().isWillRetain());
}
private boolean filterPlugins(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String clientId = msg.payload().clientIdentifier();
String userName = msg.variableHeader().hasUserName() ? msg.payload().userName() : null;
String password = msg.variableHeader().hasPassword() ? msg.payload().password() : null;
if (!Plugins.INSTANCE.get(ServiceChecker.class).isServiceAvailable()) {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return false;
}
if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId)) {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-9]
return false;
}
if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId, userName, password)) {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return false;
}
if (!Plugins.INSTANCE.get(Authorizer.class).isAuthorized(clientId, userName)) {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return false;
}
return true;
}
@Override
public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) {
System.out.println("MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());
// If you need to handle an specific packet type:
if (mqttMessage instanceof MqttPublishMessage) {
MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
String originalMessage = message.payload().toString(Charset.forName("UTF-8"));
System.out.println("Original message: " + originalMessage);
// The new message content must not be bigger that the original content.
String modifiedMessage = "Modified message ";
message.payload().setBytes(0, modifiedMessage.getBytes());
} else {
if (mqttMessage instanceof MqttConnectMessage) {
MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
System.out.println("MQTT CONNECT control packet was intercepted " + connectMessage);
}
}
// We return true which means "call next interceptor" (if there is one) or target.
// If we returned false, it means "abort call" - no more interceptors would be called and neither would
// the target
return true;
}
@Test(timeout = 60000)
public void testRejectedMqttConnectMessage() throws Exception {
CountDownLatch publishThreadReady = new CountDownLatch(1);
server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) (packet, connection) -> {
if (packet.getClass() == MqttConnectMessage.class) {
return false;
} else {
return true;
}
});
Thread publishThread = new Thread(() -> {
MQTTClientProvider publishProvider = getMQTTClientProvider();
publishThreadReady.countDown();
try {
initializeConnection(publishProvider);
publishProvider.disconnect();
fail("The connection should be rejected!");
} catch (Exception ignore) {
}
});
publishThread.start();
publishThreadReady.await();
publishThread.join(3000);
if (publishThread.isAlive()) {
fail("The connection is stuck!");
}
}
/**
* Called during connection.
*
* @param connect
*/
void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
}
public InterceptConnectMessage(MqttConnectMessage msg) {
super(msg);
this.msg = msg;
}
/**
* processConnect
* @param channel
* @param msg
*/
public void processConnect(Channel channel, MqttConnectMessage msg) {
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
writeBackConnect(channel, ProtocolUtil.connectReturnCodeForException(cause), false, true);
return;
}
String deviceid = msg.payload().clientIdentifier();
// clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
if (deviceid == null || deviceid.trim().length() == 0) {
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false, true);
return;
}
// 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null
: new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!authService.checkValid(deviceid, username, password)) {
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false, true);
return;
}
boolean isCleanSession = msg.variableHeader().isCleanSession();
// 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
if (sessionService.containsKey(deviceid)) {
if (isCleanSession) {
sessionService.remove(deviceid);
topicProcess.removeByCleanSession(deviceid);
procedureProcess.removeByCleanSession(deviceid);
consumerProcess.removeByCleanSession(deviceid);
}
sessionService.closeSession(deviceid);
}
// 处理遗嘱信息
MqttSession sessionStore = new MqttSession(deviceid, channel, isCleanSession, null);
if (msg.variableHeader().isWillFlag()) {
MqttPublishMessage willMessage = ProtocolUtil.publishMessage(msg.payload().willTopic(), false,
msg.variableHeader().willQos(), msg.variableHeader().isWillRetain(), 0,
msg.payload().willMessageInBytes());
sessionStore.setWillMessage(willMessage);
}
// 处理连接心跳包
int idelTimes = msg.variableHeader().keepAliveTimeSeconds();
if (idelTimes <= 0) {
idelTimes = 60;
}
if (idelTimes> 0) {
String idelStr = NettyConstant.HANDLER_NAME_HEARTCHECK;
if (channel.pipeline().names().contains(idelStr)) {
channel.pipeline().remove(idelStr);
}
channel.pipeline().addFirst(idelStr,
new IdleStateHandler(0, 0, Math.round(idelTimes * 1.5f)));
}
// 至此存储会话信息及返回接受客户端连接
sessionService.put(deviceid, sessionStore);
channel.attr(NettyConstant.CLIENTID_KEY).set(deviceid);
Boolean sessionPresent = sessionService.containsKey(deviceid) && !isCleanSession;
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent, false);
NettyLog.debug("CONNECT - clientId: {}, cleanSession: {}", deviceid, isCleanSession);
// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
if (!isCleanSession) {
this.consumerProcess.processHistoryPub(channel);
this.procedureProcess.processHistoryPubRel(channel);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (!msg.decoderResult().isSuccess()) {
NettyLog.error("error decoder");
ctx.close();
return;
}
NettyLog.debug("read: " + msg.fixedHeader().messageType());
if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
} else {
if (!NettyUtil.isLogin(ctx.channel())) {
NettyLog.info("not login");
return ;
}
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}
public void processConnect(Channel client, MqttConnectMessage connectMessage) {
String clientId = connectMessage.payload().clientIdentifier();
boolean isCleanSession = connectMessage.variableHeader().isCleanSession();
//验证版本
if (!connectMessage.variableHeader().name().equals("MQTT") ||
connectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
if (LOG.isDebugEnabled()) {
LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientId, connectMessage.toString());
}
sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
return;
}
MqttConnectReturnCode resultCode = checkAuth(connectMessage);
if (!(resultCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) ||
Strings.isNullOrEmpty(clientId)) {
sendAckToClient(client, connectMessage, resultCode, false);
return;
}
addConnection(client, connectMessage, clientId);
//处理心跳包时间,把心跳包时长和一些其他属性都添加到会话中,方便以后使用
initializeKeepAliveTimeout(client, connectMessage, clientId);
storeWillMessage(clientId, connectMessage);
sessionManager.addSession(clientId, isCleanSession);
MqttConnAckMessage okResp = sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);
if (okResp.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>",
clientId,
okResp.variableHeader().connectReturnCode().byteValue(),
client.remoteAddress(),
client.localAddress()
);
}
consumerManager.fireConsume(clientId);
LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", clientId, client.remoteAddress(), client.localAddress());
}
@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
MqttConnectMessage connectMessage = (MqttConnectMessage) message;
mqttProtocolHandler.processConnect(client, connectMessage);
}