类 io.netty.handler.codec.mqtt.MqttTopicSubscription 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttTopicSubscription 的API类实例代码及写法,或者点击链接到github查看源代码。


/**
 *  S - B
 * @param channel
 * @param msg
 */
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
	String clientId = NettyUtil.getClientId(channel);
	List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();

	List<Integer> mqttQoSList = this.topicProcess.processTopicSubscribe(clientId, topicSubscriptions);
	if ((mqttQoSList != null) && (mqttQoSList.size() > 0)) {
		this.sendProcess.sendSubAckMessage(channel, msg.variableHeader().messageId(), mqttQoSList);
		// 发布保留消息
		topicSubscriptions.forEach(topicSubscription -> {
			String topicFilter = topicSubscription.topicName();
			MqttQoS mqttQoS = topicSubscription.qualityOfService();
			List<RetainMessage> retainMessageList = this.topicProcess.searchRetainMessage(topicFilter);
			if ((retainMessageList != null) && (retainMessageList.size() > 0)) {
				NettyLog.debug("sendRetainMessage: {}, {}", clientId, topicFilter);
				this.consumerProcess.sendRetainMessage(channel, retainMessageList, mqttQoS);
			}
		});
	} else {
		NettyLog.error("error Subscribe");
		channel.close();
	}
}
 

public List<Integer> processTopicSubscribe(String clientId, List<MqttTopicSubscription> topicSubscriptions ) {
	if (this.validTopicFilter(topicSubscriptions)) {
		List<Integer> mqttQoSList = new ArrayList<Integer>();
		topicSubscriptions.forEach(topicSubscription -> {
			String topicFilter = topicSubscription.topicName();
			MqttQoS mqttQoS = topicSubscription.qualityOfService();
			SubscribeTopicInfo subscribeStore = new SubscribeTopicInfo(clientId, topicFilter, mqttQoS.value());
			topicService.put(topicFilter, subscribeStore);
			mqttQoSList.add(mqttQoS.value());
			NettyLog.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", clientId, topicFilter,
					mqttQoS.value());
		});
		return mqttQoSList;
		
	} else {
		NettyLog.error("error Subscribe");
		return null;
	}
}
 
源代码3 项目: joyqueue   文件: MqttProtocolHandler.java

private Set<MqttSubscription> subscribe(List<MqttTopicSubscription> topicSubscribes, String clientID, String clientGroup, List<Integer> resultCodes) throws Exception {
    List<MqttSubscription> needSubsTopicFilters = new ArrayList<>();
    for (MqttTopicSubscription mts : topicSubscribes) {
        // 验证topicfilter是不是合法
        if (!new TopicFilter(mts.topicName()).isValid() /*|| Token.MULTI.toString().equals(mts.topicName())*/) {
            resultCodes.add(MqttQoS.FAILURE.value());
            LOG.warn("topic filter[{}] of clientID[{}] is invalid", mts.topicName(), clientID);
        } else {
            // todo: 目前只支持qos=1的订阅 所以正确返回码统一填充AT_LEAST_ONCE 不填充订阅要求的qos=mts.qualityOfService().value()值 后续实现订阅qos等级要求 先记录qos即可
            needSubsTopicFilters.add(new MqttSubscription(clientID, new TopicFilter(mts.topicName()), mts.qualityOfService()));
            resultCodes.add(MqttQoS.AT_LEAST_ONCE.value());
        }
    }
    LOG.info("Do subscribe topics: {}, clientGroup: {}", needSubsTopicFilters, clientGroup);
    return subscriptionManager.subscribes(clientGroup, needSubsTopicFilters);
}
 

boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) {
   // synchronized to prevent race with removeSubscription
   synchronized (subscriptions) {
      addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<Long, Integer>());

      MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
      if (existingSubscription != null) {
         if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value()) {
            subscriptions.put(subscription.topicName(), subscription);
            return true;
         }
      } else {
         subscriptions.put(subscription.topicName(), subscription);
         return true;
      }
   }
   return false;
}
 

private void addSubscription(MqttTopicSubscription subscription) throws Exception {
   String topicName = CompositeAddress.extractAddressName(subscription.topicName());
   MqttTopicSubscription s = session.getSessionState().getSubscription(topicName);

   int qos = subscription.qualityOfService().value();

   String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topicName, session.getWildcardConfiguration());

   session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration());

   Queue q = createQueueForSubscription(coreAddress, qos);

   if (s == null) {
      createConsumerForSubscriptionQueue(q, topicName, qos);
   } else {
      consumerQoSLevels.put(consumers.get(topicName).getID(), qos);
   }
   session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName);
}
 
源代码6 项目: cassandana   文件: Authorizator.java

/**
 * @param clientID
 *            the clientID
 * @param username
 *            the username
 * @param msg
 *            the subscribe message to verify
 * @return the list of verified topics for the given subscribe message.
 */
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
    List<MqttTopicSubscription> ackTopics = new ArrayList<>();

    final int messageId = messageId(msg);
    for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
        Topic topic = new Topic(req.topicName());
        if (!policy.canRead(topic, username, clientID)) {
            // send SUBACK with 0x80, the user hasn't credentials to read the topic
            LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, " +
                     "topic: {}", clientID, username, messageId, topic);
            ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
        } else {
            MqttQoS qos;
            if (topic.isValid()) {
                LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}",
                          clientID, username, messageId, topic);
                qos = req.qualityOfService();
            } else {
                LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", clientID,
                         username, messageId, topic);
                qos = FAILURE;
            }
            ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
        }
    }
    return ackTopics;
}
 

public static List<MqttTopicSubscription> getSubscribeTopics(SubscribeMessage[] sMsgObj) {
	if (sMsgObj != null) {
		List<MqttTopicSubscription> list = new LinkedList<>();
		for (SubscribeMessage sb : sMsgObj) {
			MqttTopicSubscription mqttTopicSubscription = new MqttTopicSubscription(sb.getTopic(),
					MqttQoS.valueOf(sb.getQos()));
			list.add(mqttTopicSubscription);
		}
		return list;
	} else {
		return null;
	}
}
 

public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
		int messageId) {
	MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
	MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
			false, 0);
	MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
	return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
 
源代码9 项目: iot-mqtt   文件: SubscribeProcessor.java

/**
 * 返回校验合法的topic
 */
private List<Topic> validTopics(ClientSession clientSession,List<MqttTopicSubscription> topics){
    List<Topic> topicList = new ArrayList<>();
    for(MqttTopicSubscription subscription : topics){
        if(!pubSubPermission.subscribeVerfy(clientSession.getClientId(),subscription.topicName())){
            log.warn("[SubPermission] -> this clientId:{} have no permission to subscribe this topic:{}",clientSession.getClientId(),subscription.topicName());
            clientSession.getCtx().close();
            return null;
        }
        Topic topic = new Topic(subscription.topicName(),subscription.qualityOfService().value());
        topicList.add(topic);
    }
    return topicList;
}
 
源代码10 项目: jmqtt   文件: SubscribeProcessor.java

/**
 * 返回校验合法的topic
 */
private List<Topic> validTopics(ClientSession clientSession,List<MqttTopicSubscription> topics){
    List<Topic> topicList = new ArrayList<>();
    for(MqttTopicSubscription subscription : topics){
        if(!pubSubPermission.subscribeVerfy(clientSession.getClientId(),subscription.topicName())){
            log.warn("[SubPermission] this clientId:{} have no permission to subscribe this topic:{}",clientSession.getClientId(),subscription.topicName());
            clientSession.getCtx().close();
            return null;
        }
        Topic topic = new Topic(subscription.topicName(),subscription.qualityOfService().value());
        topicList.add(topic);
    }
    return topicList;
}
 
源代码11 项目: lannister   文件: MqttMessageFactory.java

public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
	int topicNameSize = 0;
	int topicCount = topicSubscriptions.length;

	for (MqttTopicSubscription item : topicSubscriptions) {
		topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
	}

	MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
			false, 2 + topicNameSize + topicCount);
	MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
	MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));

	return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
 
源代码12 项目: mithqtt   文件: DummyAuthenticatorImpl.java

@Override
public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) {
    List<MqttGrantedQoS> r = new ArrayList<>();
    requestSubscriptions.forEach(subscription -> {
        if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.NOT_GRANTED);
        else if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.NOT_GRANTED);
        else r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value()));
    });
    return r;
}
 

synchronized void start() throws Exception {
   for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
      String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration());
      Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
      createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
   }
}
 

/**
 * As per MQTT Spec.  Subscribes this client to a number of MQTT topics.
 *
 * @param subscriptions
 * @return An array of integers representing the list of accepted QoS for each topic.
 * @throws Exception
 */
int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception {
   synchronized (session.getSessionState()) {
      int[] qos = new int[subscriptions.size()];

      for (int i = 0; i < subscriptions.size(); i++) {
         addSubscription(subscriptions.get(i));
         qos[i] = subscriptions.get(i).qualityOfService().value();
      }
      return qos;
   }
}
 
源代码15 项目: ext-opensource-netty   文件: TopicProcess.java

private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
	return true;
}
 
源代码16 项目: joyqueue   文件: MqttProtocolHandler.java

public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
    List<Integer> resultCodes = new ArrayList<>();
    String clientID = NettyAttrManager.getAttrClientId(client);
    if (connectionManager.isConnected(clientID)) {
        MqttConnection connection = connectionManager.getConnection(clientID);

        if (LOG.isDebugEnabled()) {
            LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
        }

        List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
        LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);

        try {
            if (null != topicSubscribes) {
                Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
                MqttSession session = sessionManager.getSession(clientID);
                if (session != null) {
                    for (MqttSubscription subscription : topicFilters) {
                        session.addSubscription(subscription);
                    }
                }
            } else {
                // The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
                // it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
                consumerManager.stopConsume(clientID);
                sessionManager.removeSession(clientID);
                connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
                connectionManager.removeConnection(connection);
                client.close().addListener(CLOSE_ON_FAILURE);
            }
        } catch (Exception e) {
            LOG.error("subscribe is error!");
            if (resultCodes.size() < topicSubscribes.size()) {
                int minus = topicSubscribes.size() - resultCodes.size();
                for (; minus > 0; minus--) {
                    resultCodes.add(MqttQoS.FAILURE.value());
                }
            }
        }
    }

    MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
    MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
            mqttFixedHeader,
            MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
            new MqttSubAckPayload(resultCodes));
    LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
    client.writeAndFlush(subAckMessage);
}
 
源代码17 项目: iotplatform   文件: MqttTransportHandler.java

private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
  if (!checkConnected(ctx)) {
    return;
  }
  log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
  List<Integer> grantedQoSList = new ArrayList<>();
  for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
    String topicName = subscription.topicName();
    // TODO: handle this qos level.
    MqttQoS reqQoS = subscription.qualityOfService();
    try {
      if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
        // AdaptorToSessionActorMsg msg =
        // adaptor.convertToActorMsg(deviceSessionCtx,
        // SUBSCRIBE_ATTRIBUTES_REQUEST,
        // mqttMsg);
        // BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new
        // BasicToDeviceActorSessionMsg(
        // deviceSessionCtx.getDevice(), msg);
        // processor.process(basicToDeviceActorSessionMsg);
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
        // AdaptorToSessionActorMsg msg =
        // adaptor.convertToActorMsg(deviceSessionCtx,
        // SUBSCRIBE_RPC_COMMANDS_REQUEST,
        // mqttMsg);
        // processor.process(new
        // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
        deviceSessionCtx.setAllowAttributeResponses();
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else {
        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
        grantedQoSList.add(FAILURE.value());
      }
      ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel());
      MemoryMetaPool.registerTopic(channelEntity, topicName);
    } catch (Exception e) {
      e.printStackTrace();
      log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
      grantedQoSList.add(FAILURE.value());
    }
  }

  ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
 
源代码18 项目: lannister   文件: MqttClient.java

public void subscribe(MqttTopicSubscription... topicSubscriptions) throws InterruptedException {
	send(MqttMessageFactory.subscribe(nextMessageId(), topicSubscriptions));

	// TODO error handling,store subscription
}
 
源代码19 项目: activemq-artemis   文件: MQTTSessionState.java

Collection<MqttTopicSubscription> getSubscriptions() {
   return subscriptions.values();
}
 
源代码20 项目: activemq-artemis   文件: MQTTSessionState.java

MqttTopicSubscription getSubscription(String address) {
   return subscriptions.get(address);
}
 

void clean() throws Exception {
   for (MqttTopicSubscription mqttTopicSubscription : session.getSessionState().getSubscriptions()) {
      removeSubscription(mqttTopicSubscription.topicName());
   }
}
 
源代码22 项目: lannister   文件: TopicTest.java

@Test
public void persistMessagesForDisconnectedPersistantSession() throws Exception {
	String clientId = TestUtil.newClientId();

	ConnectOptions options = new ConnectOptions();
	options.clientId(clientId);
	options.cleanSession(false);

	String topicName = "testTopic";
	String message = "test message";

	MqttClient client = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
	client.connectOptions(options).connect();

	client.subscribe(new MqttTopicSubscription(topicName, MqttQoS.EXACTLY_ONCE));
	client.disconnect(true);

	Thread.sleep(100);

	Assert.assertNotNull(Session.NEXUS.get(clientId));

	String publisherId = TestUtil.newClientId();

	MqttClient publisher = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());

	ConnectOptions pubOptions = new ConnectOptions();
	pubOptions.clientId(publisherId);
	pubOptions.cleanSession(true);

	int messageId = 1;
	publisher.connectOptions(pubOptions).connect();
	publisher.publish(new Message(messageId, topicName, publisherId, message.getBytes(CharsetUtil.UTF_8),
			MqttQoS.EXACTLY_ONCE, false));

	Thread.sleep(100);

	publisher.disconnect(true);

	Thread.sleep(1000);

	Assert.assertNull(Session.NEXUS.get(publisherId));

	Topic topic = Topic.NEXUS.get(topicName);
	Assert.assertNotNull(topic);

	OutboundMessageStatus status = OutboundMessageStatus.NEXUS.getBy(messageId, clientId);

	Assert.assertNotNull(status);
	Assert.assertTrue(Messages.key(publisherId, messageId).equals(status.messageKey()));
}
 
源代码23 项目: mithqtt   文件: Authenticator.java

/**
 * Authorize client SUBSCRIBE
 *
 * @param clientId             Client Id
 * @param userName             User Name
 * @param requestSubscriptions List of request Topic Subscription
 * @return List of granted QoS
 */
List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions);
 
 类所在包
 类方法
 同包方法