下面列出了 io.netty.handler.codec.mqtt.MqttSubAckPayload #io.netty.handler.codec.mqtt.MqttSubAckMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
if (msgx == null) {return ;}
MqttMessage msg = (MqttMessage) msgx;
NettyLog.debug("read: {}", msg.fixedHeader().messageType());
MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
switch (mqttFixedHeader.messageType()) {
case CONNACK:
clientProtocolProcess.processConnectBack(ctx.channel(), (MqttConnAckMessage) msg);
break;
case UNSUBACK:
clientProtocolProcess.processUnSubBack(ctx.channel(), msg);
break;
case PUBLISH:
clientProtocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
clientProtocolProcess.processPubAck(ctx.channel(), msg);
break;
case PUBREC:
clientProtocolProcess.processPubRec(ctx.channel(), msg);
break;
case PUBREL:
clientProtocolProcess.processPubRel(ctx.channel(), msg);
break;
case PUBCOMP:
clientProtocolProcess.processPubComp(ctx.channel(), msg);
break;
case SUBACK:
clientProtocolProcess.processSubAck(ctx.channel(), (MqttSubAckMessage) msg);
break;
default:
break;
}
}
public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2 + grantedQoSLevels.size());
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
void handleSubscribe(MqttSubscribeMessage message) throws Exception {
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
sendToClient(ack);
}
public static MqttSubAckMessage subAckMessage(int messageId, List<Integer> mqttQoSList) {
return (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), new MqttSubAckPayload(mqttQoSList));
}
public static MqttMessage getSubAckMessage(int messageId, List<Integer> qos){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,false,MqttQoS.AT_MOST_ONCE,false,0);
MqttMessageIdVariableHeader idVariableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload subAckPayload = new MqttSubAckPayload(qos);
return new MqttSubAckMessage(fixedHeader,idVariableHeader,subAckPayload);
}
public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
List<Integer> resultCodes = new ArrayList<>();
String clientID = NettyAttrManager.getAttrClientId(client);
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
if (LOG.isDebugEnabled()) {
LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
}
List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);
try {
if (null != topicSubscribes) {
Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : topicFilters) {
session.addSubscription(subscription);
}
}
} else {
// The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
LOG.error("subscribe is error!");
if (resultCodes.size() < topicSubscribes.size()) {
int minus = topicSubscribes.size() - resultCodes.size();
for (; minus > 0; minus--) {
resultCodes.add(MqttQoS.FAILURE.value());
}
}
}
}
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
new MqttSubAckPayload(resultCodes));
LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
client.writeAndFlush(subAckMessage);
}
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
}
/**
* B - P
* @param channel
* @param mqttMessage
*/
public void processSubAck(Channel channel, MqttSubAckMessage mqttMessage) {
int messageId = mqttMessage.variableHeader().messageId();
this.consumerProcess.processSubAck(messageId);
}