下面列出了怎么用 io.netty.handler.codec.mqtt.MqttSubscribeMessage 的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* S - B
* @param channel
* @param msg
*/
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientId = NettyUtil.getClientId(channel);
List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
List<Integer> mqttQoSList = this.topicProcess.processTopicSubscribe(clientId, topicSubscriptions);
if ((mqttQoSList != null) && (mqttQoSList.size() > 0)) {
this.sendProcess.sendSubAckMessage(channel, msg.variableHeader().messageId(), mqttQoSList);
// 发布保留消息
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
List<RetainMessage> retainMessageList = this.topicProcess.searchRetainMessage(topicFilter);
if ((retainMessageList != null) && (retainMessageList.size() > 0)) {
NettyLog.debug("sendRetainMessage: {}, {}", clientId, topicFilter);
this.consumerProcess.sendRetainMessage(channel, retainMessageList, mqttQoS);
}
});
} else {
NettyLog.error("error Subscribe");
channel.close();
}
}
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
String clientId = NettyUtil.getClientId(ctx.channel());
int messageId = subscribeMessage.variableHeader().messageId();
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
List<Topic> validTopicList =validTopics(clientSession,subscribeMessage.payload().topicSubscriptions());
if(validTopicList == null || validTopicList.size() == 0){
log.warn("[Subscribe] -> Valid all subscribe topic failure,clientId:{}",clientId);
return;
}
List<Integer> ackQos = getTopicQos(validTopicList);
MqttMessage subAckMessage = MessageUtil.getSubAckMessage(messageId,ackQos);
ctx.writeAndFlush(subAckMessage);
// send retain messages
List<Message> retainMessages = subscribe(clientSession,validTopicList);
dispatcherRetainMessage(clientSession,retainMessages);
}
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
String clientId = NettyUtil.getClientId(ctx.channel());
int messageId = subscribeMessage.variableHeader().messageId();
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
List<Topic> validTopicList =validTopics(clientSession,subscribeMessage.payload().topicSubscriptions());
if(validTopicList == null || validTopicList.size() == 0){
log.warn("[Subscribe] -> Valid all subscribe topic failure,clientId:{}",clientId);
return;
}
List<Integer> ackQos = getTopicQos(validTopicList);
MqttMessage subAckMessage = MessageUtil.getSubAckMessage(messageId,ackQos);
ctx.writeAndFlush(subAckMessage);
// send retain messages
List<Message> retainMessages = subscribe(clientSession,validTopicList);
dispatcherRetainMessage(clientSession,retainMessages);
}
public MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message) {
this.future = future;
this.topic = topic;
this.subscribeMessage = message;
this.retransmissionHandler.setOriginalMessage(message);
}
public void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
if(this.sent){ //If the packet is sent, we can start the retransmit timer
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retransmissionHandler.start(eventLoop);
}
}
/**
* @param clientID
* the clientID
* @param username
* the username
* @param msg
* the subscribe message to verify
* @return the list of verified topics for the given subscribe message.
*/
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> ackTopics = new ArrayList<>();
final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!policy.canRead(topic, username, clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, " +
"topic: {}", clientID, username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}",
clientID, username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", clientID,
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
}
return ackTopics;
}
public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
int messageId) {
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
// deviceSessionCtx.setChannel(ctx);
// assetSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
// System.out.println("write...");
// ctx.write("just for test");
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
}
}
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
int topicNameSize = 0;
int topicCount = topicSubscriptions.length;
for (MqttTopicSubscription item : topicSubscriptions) {
topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 2 + topicNameSize + topicCount);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
void handleSubscribe(MqttSubscribeMessage message) throws Exception {
MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
sendToClient(ack);
}
public MqttSubscribeMessage getSubscribeMessage() {
return subscribeMessage;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (!msg.decoderResult().isSuccess()) {
NettyLog.error("error decoder");
ctx.close();
return;
}
NettyLog.debug("read: " + msg.fixedHeader().messageType());
if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
} else {
if (!NettyUtil.isLogin(ctx.channel())) {
NettyLog.info("not login");
return ;
}
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}
public static MqttSubscribeMessage subscribeMessage(int messageId, SubscribeMessage... msg) {
return ClientProtocolUtil.subscribeMessage(getSubscribeTopics(msg), messageId);
}
@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) message;
mqttProtocolHandler.processSubscribe(client, subscribeMessage);
}
public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
List<Integer> resultCodes = new ArrayList<>();
String clientID = NettyAttrManager.getAttrClientId(client);
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
if (LOG.isDebugEnabled()) {
LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
}
List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);
try {
if (null != topicSubscribes) {
Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : topicFilters) {
session.addSubscription(subscription);
}
}
} else {
// The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
LOG.error("subscribe is error!");
if (resultCodes.size() < topicSubscribes.size()) {
int minus = topicSubscribes.size() - resultCodes.size();
for (; minus > 0; minus--) {
resultCodes.add(MqttQoS.FAILURE.value());
}
}
}
}
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
new MqttSubAckPayload(resultCodes));
LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
client.writeAndFlush(subAckMessage);
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topicName = subscription.topicName();
// TODO: handle this qos level.
MqttQoS reqQoS = subscription.qualityOfService();
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// SUBSCRIBE_ATTRIBUTES_REQUEST,
// mqttMsg);
// BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new
// BasicToDeviceActorSessionMsg(
// deviceSessionCtx.getDevice(), msg);
// processor.process(basicToDeviceActorSessionMsg);
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// SUBSCRIBE_RPC_COMMANDS_REQUEST,
// mqttMsg);
// processor.process(new
// BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setAllowAttributeResponses();
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
grantedQoSList.add(FAILURE.value());
}
ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel());
MemoryMetaPool.registerTopic(channelEntity, topicName);
} catch (Exception e) {
e.printStackTrace();
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
grantedQoSList.add(FAILURE.value());
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (stopped) {
disconnect(true);
return;
}
MqttMessage message = (MqttMessage) msg;
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
log.debug("Bad Message Disconnecting Client.");
disconnect(true);
return;
}
connection.dataReceived();
if (AuditLogger.isAnyLoggingEnabled()) {
AuditLogger.setRemoteAddress(connection.getRemoteAddress());
}
MQTTUtil.logMessage(session.getState(), message, true);
if (this.protocolManager.invokeIncoming(message, this.connection) != null) {
log.debugf("Interceptor rejected MQTT message: %s", message);
disconnect(true);
return;
}
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnect((MqttConnectMessage) message, ctx);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case PINGREQ:
handlePingreq();
break;
case DISCONNECT:
disconnect(false);
break;
case UNSUBACK:
case SUBACK:
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch (Exception e) {
log.debug("Error processing Control Packet, Disconnecting Client", e);
disconnect(true);
} finally {
ReferenceCountUtil.release(msg);
}
}