下面列出了怎么用 io.netty.handler.codec.mqtt.MqttTopicSubscription 的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* S - B
* @param channel
* @param msg
*/
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientId = NettyUtil.getClientId(channel);
List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();
List<Integer> mqttQoSList = this.topicProcess.processTopicSubscribe(clientId, topicSubscriptions);
if ((mqttQoSList != null) && (mqttQoSList.size() > 0)) {
this.sendProcess.sendSubAckMessage(channel, msg.variableHeader().messageId(), mqttQoSList);
// 发布保留消息
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
List<RetainMessage> retainMessageList = this.topicProcess.searchRetainMessage(topicFilter);
if ((retainMessageList != null) && (retainMessageList.size() > 0)) {
NettyLog.debug("sendRetainMessage: {}, {}", clientId, topicFilter);
this.consumerProcess.sendRetainMessage(channel, retainMessageList, mqttQoS);
}
});
} else {
NettyLog.error("error Subscribe");
channel.close();
}
}
public List<Integer> processTopicSubscribe(String clientId, List<MqttTopicSubscription> topicSubscriptions ) {
if (this.validTopicFilter(topicSubscriptions)) {
List<Integer> mqttQoSList = new ArrayList<Integer>();
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
SubscribeTopicInfo subscribeStore = new SubscribeTopicInfo(clientId, topicFilter, mqttQoS.value());
topicService.put(topicFilter, subscribeStore);
mqttQoSList.add(mqttQoS.value());
NettyLog.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", clientId, topicFilter,
mqttQoS.value());
});
return mqttQoSList;
} else {
NettyLog.error("error Subscribe");
return null;
}
}
private Set<MqttSubscription> subscribe(List<MqttTopicSubscription> topicSubscribes, String clientID, String clientGroup, List<Integer> resultCodes) throws Exception {
List<MqttSubscription> needSubsTopicFilters = new ArrayList<>();
for (MqttTopicSubscription mts : topicSubscribes) {
// 验证topicfilter是不是合法
if (!new TopicFilter(mts.topicName()).isValid() /*|| Token.MULTI.toString().equals(mts.topicName())*/) {
resultCodes.add(MqttQoS.FAILURE.value());
LOG.warn("topic filter[{}] of clientID[{}] is invalid", mts.topicName(), clientID);
} else {
// todo: 目前只支持qos=1的订阅 所以正确返回码统一填充AT_LEAST_ONCE 不填充订阅要求的qos=mts.qualityOfService().value()值 后续实现订阅qos等级要求 先记录qos即可
needSubsTopicFilters.add(new MqttSubscription(clientID, new TopicFilter(mts.topicName()), mts.qualityOfService()));
resultCodes.add(MqttQoS.AT_LEAST_ONCE.value());
}
}
LOG.info("Do subscribe topics: {}, clientGroup: {}", needSubsTopicFilters, clientGroup);
return subscriptionManager.subscribes(clientGroup, needSubsTopicFilters);
}
boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<Long, Integer>());
MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
if (existingSubscription != null) {
if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value()) {
subscriptions.put(subscription.topicName(), subscription);
return true;
}
} else {
subscriptions.put(subscription.topicName(), subscription);
return true;
}
}
return false;
}
private void addSubscription(MqttTopicSubscription subscription) throws Exception {
String topicName = CompositeAddress.extractAddressName(subscription.topicName());
MqttTopicSubscription s = session.getSessionState().getSubscription(topicName);
int qos = subscription.qualityOfService().value();
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topicName, session.getWildcardConfiguration());
session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration());
Queue q = createQueueForSubscription(coreAddress, qos);
if (s == null) {
createConsumerForSubscriptionQueue(q, topicName, qos);
} else {
consumerQoSLevels.put(consumers.get(topicName).getID(), qos);
}
session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName);
}
/**
* @param clientID
* the clientID
* @param username
* the username
* @param msg
* the subscribe message to verify
* @return the list of verified topics for the given subscribe message.
*/
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> ackTopics = new ArrayList<>();
final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!policy.canRead(topic, username, clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, " +
"topic: {}", clientID, username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}",
clientID, username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", clientID,
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
}
return ackTopics;
}
public static List<MqttTopicSubscription> getSubscribeTopics(SubscribeMessage[] sMsgObj) {
if (sMsgObj != null) {
List<MqttTopicSubscription> list = new LinkedList<>();
for (SubscribeMessage sb : sMsgObj) {
MqttTopicSubscription mqttTopicSubscription = new MqttTopicSubscription(sb.getTopic(),
MqttQoS.valueOf(sb.getQos()));
list.add(mqttTopicSubscription);
}
return list;
} else {
return null;
}
}
public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
int messageId) {
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
/**
* 返回校验合法的topic
*/
private List<Topic> validTopics(ClientSession clientSession,List<MqttTopicSubscription> topics){
List<Topic> topicList = new ArrayList<>();
for(MqttTopicSubscription subscription : topics){
if(!pubSubPermission.subscribeVerfy(clientSession.getClientId(),subscription.topicName())){
log.warn("[SubPermission] -> this clientId:{} have no permission to subscribe this topic:{}",clientSession.getClientId(),subscription.topicName());
clientSession.getCtx().close();
return null;
}
Topic topic = new Topic(subscription.topicName(),subscription.qualityOfService().value());
topicList.add(topic);
}
return topicList;
}
/**
* 返回校验合法的topic
*/
private List<Topic> validTopics(ClientSession clientSession,List<MqttTopicSubscription> topics){
List<Topic> topicList = new ArrayList<>();
for(MqttTopicSubscription subscription : topics){
if(!pubSubPermission.subscribeVerfy(clientSession.getClientId(),subscription.topicName())){
log.warn("[SubPermission] this clientId:{} have no permission to subscribe this topic:{}",clientSession.getClientId(),subscription.topicName());
clientSession.getCtx().close();
return null;
}
Topic topic = new Topic(subscription.topicName(),subscription.qualityOfService().value());
topicList.add(topic);
}
return topicList;
}
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
int topicNameSize = 0;
int topicCount = topicSubscriptions.length;
for (MqttTopicSubscription item : topicSubscriptions) {
topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 2 + topicNameSize + topicCount);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
@Override
public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) {
List<MqttGrantedQoS> r = new ArrayList<>();
requestSubscriptions.forEach(subscription -> {
if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.NOT_GRANTED);
else if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.NOT_GRANTED);
else r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value()));
});
return r;
}
synchronized void start() throws Exception {
for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration());
Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
}
}
/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
* @param subscriptions
* @return An array of integers representing the list of accepted QoS for each topic.
* @throws Exception
*/
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception {
synchronized (session.getSessionState()) {
int[] qos = new int[subscriptions.size()];
for (int i = 0; i < subscriptions.size(); i++) {
addSubscription(subscriptions.get(i));
qos[i] = subscriptions.get(i).qualityOfService().value();
}
return qos;
}
}
private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
return true;
}
public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
List<Integer> resultCodes = new ArrayList<>();
String clientID = NettyAttrManager.getAttrClientId(client);
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
if (LOG.isDebugEnabled()) {
LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
}
List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);
try {
if (null != topicSubscribes) {
Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : topicFilters) {
session.addSubscription(subscription);
}
}
} else {
// The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
LOG.error("subscribe is error!");
if (resultCodes.size() < topicSubscribes.size()) {
int minus = topicSubscribes.size() - resultCodes.size();
for (; minus > 0; minus--) {
resultCodes.add(MqttQoS.FAILURE.value());
}
}
}
}
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
new MqttSubAckPayload(resultCodes));
LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
client.writeAndFlush(subAckMessage);
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topicName = subscription.topicName();
// TODO: handle this qos level.
MqttQoS reqQoS = subscription.qualityOfService();
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// SUBSCRIBE_ATTRIBUTES_REQUEST,
// mqttMsg);
// BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new
// BasicToDeviceActorSessionMsg(
// deviceSessionCtx.getDevice(), msg);
// processor.process(basicToDeviceActorSessionMsg);
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// SUBSCRIBE_RPC_COMMANDS_REQUEST,
// mqttMsg);
// processor.process(new
// BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setAllowAttributeResponses();
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
grantedQoSList.add(FAILURE.value());
}
ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel());
MemoryMetaPool.registerTopic(channelEntity, topicName);
} catch (Exception e) {
e.printStackTrace();
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
grantedQoSList.add(FAILURE.value());
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
public void subscribe(MqttTopicSubscription... topicSubscriptions) throws InterruptedException {
send(MqttMessageFactory.subscribe(nextMessageId(), topicSubscriptions));
// TODO error handling,store subscription
}
Collection<MqttTopicSubscription> getSubscriptions() {
return subscriptions.values();
}
MqttTopicSubscription getSubscription(String address) {
return subscriptions.get(address);
}
void clean() throws Exception {
for (MqttTopicSubscription mqttTopicSubscription : session.getSessionState().getSubscriptions()) {
removeSubscription(mqttTopicSubscription.topicName());
}
}
@Test
public void persistMessagesForDisconnectedPersistantSession() throws Exception {
String clientId = TestUtil.newClientId();
ConnectOptions options = new ConnectOptions();
options.clientId(clientId);
options.cleanSession(false);
String topicName = "testTopic";
String message = "test message";
MqttClient client = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
client.connectOptions(options).connect();
client.subscribe(new MqttTopicSubscription(topicName, MqttQoS.EXACTLY_ONCE));
client.disconnect(true);
Thread.sleep(100);
Assert.assertNotNull(Session.NEXUS.get(clientId));
String publisherId = TestUtil.newClientId();
MqttClient publisher = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
ConnectOptions pubOptions = new ConnectOptions();
pubOptions.clientId(publisherId);
pubOptions.cleanSession(true);
int messageId = 1;
publisher.connectOptions(pubOptions).connect();
publisher.publish(new Message(messageId, topicName, publisherId, message.getBytes(CharsetUtil.UTF_8),
MqttQoS.EXACTLY_ONCE, false));
Thread.sleep(100);
publisher.disconnect(true);
Thread.sleep(1000);
Assert.assertNull(Session.NEXUS.get(publisherId));
Topic topic = Topic.NEXUS.get(topicName);
Assert.assertNotNull(topic);
OutboundMessageStatus status = OutboundMessageStatus.NEXUS.getBy(messageId, clientId);
Assert.assertNotNull(status);
Assert.assertTrue(Messages.key(publisherId, messageId).equals(status.messageKey()));
}
/**
* Authorize client SUBSCRIBE
*
* @param clientId Client Id
* @param userName User Name
* @param requestSubscriptions List of request Topic Subscription
* @return List of granted QoS
*/
List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions);