类 io.netty.handler.codec.mqtt.MqttSubscribeMessage 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttSubscribeMessage 的API类实例代码及写法,或者点击链接到github查看源代码。


/**
 *  S - B
 * @param channel
 * @param msg
 */
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
	String clientId = NettyUtil.getClientId(channel);
	List<MqttTopicSubscription> topicSubscriptions = msg.payload().topicSubscriptions();

	List<Integer> mqttQoSList = this.topicProcess.processTopicSubscribe(clientId, topicSubscriptions);
	if ((mqttQoSList != null) && (mqttQoSList.size() > 0)) {
		this.sendProcess.sendSubAckMessage(channel, msg.variableHeader().messageId(), mqttQoSList);
		// 发布保留消息
		topicSubscriptions.forEach(topicSubscription -> {
			String topicFilter = topicSubscription.topicName();
			MqttQoS mqttQoS = topicSubscription.qualityOfService();
			List<RetainMessage> retainMessageList = this.topicProcess.searchRetainMessage(topicFilter);
			if ((retainMessageList != null) && (retainMessageList.size() > 0)) {
				NettyLog.debug("sendRetainMessage: {}, {}", clientId, topicFilter);
				this.consumerProcess.sendRetainMessage(channel, retainMessageList, mqttQoS);
			}
		});
	} else {
		NettyLog.error("error Subscribe");
		channel.close();
	}
}
 
源代码2 项目: iot-mqtt   文件: SubscribeProcessor.java

@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
    MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
    String clientId = NettyUtil.getClientId(ctx.channel());
    int messageId = subscribeMessage.variableHeader().messageId();
    ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
    List<Topic> validTopicList =validTopics(clientSession,subscribeMessage.payload().topicSubscriptions());
    if(validTopicList == null || validTopicList.size() == 0){
        log.warn("[Subscribe] -> Valid all subscribe topic failure,clientId:{}",clientId);
        return;
    }
    List<Integer> ackQos = getTopicQos(validTopicList);
    MqttMessage subAckMessage = MessageUtil.getSubAckMessage(messageId,ackQos);
    ctx.writeAndFlush(subAckMessage);
    // send retain messages
    List<Message> retainMessages = subscribe(clientSession,validTopicList);
    dispatcherRetainMessage(clientSession,retainMessages);
}
 
源代码3 项目: jmqtt   文件: SubscribeProcessor.java

@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
    MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
    String clientId = NettyUtil.getClientId(ctx.channel());
    int messageId = subscribeMessage.variableHeader().messageId();
    ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
    List<Topic> validTopicList =validTopics(clientSession,subscribeMessage.payload().topicSubscriptions());
    if(validTopicList == null || validTopicList.size() == 0){
        log.warn("[Subscribe] -> Valid all subscribe topic failure,clientId:{}",clientId);
        return;
    }
    List<Integer> ackQos = getTopicQos(validTopicList);
    MqttMessage subAckMessage = MessageUtil.getSubAckMessage(messageId,ackQos);
    ctx.writeAndFlush(subAckMessage);
    // send retain messages
    List<Message> retainMessages = subscribe(clientSession,validTopicList);
    dispatcherRetainMessage(clientSession,retainMessages);
}
 

public MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message) {
    this.future = future;
    this.topic = topic;
    this.subscribeMessage = message;

    this.retransmissionHandler.setOriginalMessage(message);
}
 

public  void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
    if(this.sent){ //If the packet is sent, we can start the retransmit timer
        this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
                sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
        this.retransmissionHandler.start(eventLoop);
    }
}
 
源代码6 项目: cassandana   文件: Authorizator.java

/**
 * @param clientID
 *            the clientID
 * @param username
 *            the username
 * @param msg
 *            the subscribe message to verify
 * @return the list of verified topics for the given subscribe message.
 */
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
    List<MqttTopicSubscription> ackTopics = new ArrayList<>();

    final int messageId = messageId(msg);
    for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
        Topic topic = new Topic(req.topicName());
        if (!policy.canRead(topic, username, clientID)) {
            // send SUBACK with 0x80, the user hasn't credentials to read the topic
            LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, " +
                     "topic: {}", clientID, username, messageId, topic);
            ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
        } else {
            MqttQoS qos;
            if (topic.isValid()) {
                LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}",
                          clientID, username, messageId, topic);
                qos = req.qualityOfService();
            } else {
                LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", clientID,
                         username, messageId, topic);
                qos = FAILURE;
            }
            ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
        }
    }
    return ackTopics;
}
 

public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
		int messageId) {
	MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
	MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
			false, 0);
	MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
	return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
 

private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
  // deviceSessionCtx.setChannel(ctx);
  // assetSessionCtx.setChannel(ctx);

  switch (msg.fixedHeader().messageType()) {
  case CONNECT:
    processConnect(ctx, (MqttConnectMessage) msg);
    break;
  case PUBLISH:
    processPublish(ctx, (MqttPublishMessage) msg);
    // System.out.println("write...");
    // ctx.write("just for test");
    break;
  case SUBSCRIBE:
    processSubscribe(ctx, (MqttSubscribeMessage) msg);
    break;
  case UNSUBSCRIBE:
    processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
    break;
  case PINGREQ:
    if (checkConnected(ctx)) {
      ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
    }
    break;
  case DISCONNECT:
    if (checkConnected(ctx)) {
      processDisconnect(ctx);
    }
    break;
  }
}
 
源代码9 项目: lannister   文件: MqttMessageFactory.java

public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
	int topicNameSize = 0;
	int topicCount = topicSubscriptions.length;

	for (MqttTopicSubscription item : topicSubscriptions) {
		topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
	}

	MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
			false, 2 + topicNameSize + topicCount);
	MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
	MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));

	return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
 

void handleSubscribe(MqttSubscribeMessage message) throws Exception {
   MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
   int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());

   MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
   MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
   sendToClient(ack);
}
 

public   MqttSubscribeMessage getSubscribeMessage() {
    return subscribeMessage;
}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
	
	if (!msg.decoderResult().isSuccess()) {
		NettyLog.error("error decoder");
		ctx.close(); 
		return;
	}
	
	NettyLog.debug("read: " + msg.fixedHeader().messageType());
	
	if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
		protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
	} else {
		if (!NettyUtil.isLogin(ctx.channel())) {
			NettyLog.info("not login");
			return ;
		}
	}

	switch (msg.fixedHeader().messageType()) {
	case CONNECT:
		break;
	case CONNACK:
		break;
	case PUBLISH:
		protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
		break;
	case PUBACK:
		protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBREC:
		protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBREL:
		protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case PUBCOMP:
		protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
		break;
	case SUBSCRIBE:
		protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg); 
		break;
	case SUBACK:
		break;
	case UNSUBSCRIBE:
		protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
		break;
	case UNSUBACK:
		break;
	case PINGREQ:
		protocolProcess.processPingReq(ctx.channel(), msg);
		break;
	case PINGRESP:
		break;
	case DISCONNECT:
		protocolProcess.processDisConnect(ctx.channel(), msg);
		break;
	default:
		break;
	}
}
 

public static MqttSubscribeMessage subscribeMessage(int messageId, SubscribeMessage... msg) {
	return ClientProtocolUtil.subscribeMessage(getSubscribeTopics(msg), messageId);
}
 
源代码14 项目: joyqueue   文件: SubscribeHandler.java

@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
    MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) message;

    mqttProtocolHandler.processSubscribe(client, subscribeMessage);
}
 
源代码15 项目: joyqueue   文件: MqttProtocolHandler.java

public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
    List<Integer> resultCodes = new ArrayList<>();
    String clientID = NettyAttrManager.getAttrClientId(client);
    if (connectionManager.isConnected(clientID)) {
        MqttConnection connection = connectionManager.getConnection(clientID);

        if (LOG.isDebugEnabled()) {
            LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
        }

        List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
        LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);

        try {
            if (null != topicSubscribes) {
                Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
                MqttSession session = sessionManager.getSession(clientID);
                if (session != null) {
                    for (MqttSubscription subscription : topicFilters) {
                        session.addSubscription(subscription);
                    }
                }
            } else {
                // The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
                // it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
                consumerManager.stopConsume(clientID);
                sessionManager.removeSession(clientID);
                connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
                connectionManager.removeConnection(connection);
                client.close().addListener(CLOSE_ON_FAILURE);
            }
        } catch (Exception e) {
            LOG.error("subscribe is error!");
            if (resultCodes.size() < topicSubscribes.size()) {
                int minus = topicSubscribes.size() - resultCodes.size();
                for (; minus > 0; minus--) {
                    resultCodes.add(MqttQoS.FAILURE.value());
                }
            }
        }
    }

    MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
    MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
            mqttFixedHeader,
            MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
            new MqttSubAckPayload(resultCodes));
    LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
    client.writeAndFlush(subAckMessage);
}
 
源代码16 项目: iotplatform   文件: MqttTransportHandler.java

private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
  if (!checkConnected(ctx)) {
    return;
  }
  log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
  List<Integer> grantedQoSList = new ArrayList<>();
  for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
    String topicName = subscription.topicName();
    // TODO: handle this qos level.
    MqttQoS reqQoS = subscription.qualityOfService();
    try {
      if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
        // AdaptorToSessionActorMsg msg =
        // adaptor.convertToActorMsg(deviceSessionCtx,
        // SUBSCRIBE_ATTRIBUTES_REQUEST,
        // mqttMsg);
        // BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new
        // BasicToDeviceActorSessionMsg(
        // deviceSessionCtx.getDevice(), msg);
        // processor.process(basicToDeviceActorSessionMsg);
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
        // AdaptorToSessionActorMsg msg =
        // adaptor.convertToActorMsg(deviceSessionCtx,
        // SUBSCRIBE_RPC_COMMANDS_REQUEST,
        // mqttMsg);
        // processor.process(new
        // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
        deviceSessionCtx.setAllowAttributeResponses();
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
        grantedQoSList.add(getMinSupportedQos(reqQoS));
      } else {
        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
        grantedQoSList.add(FAILURE.value());
      }
      ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel());
      MemoryMetaPool.registerTopic(channelEntity, topicName);
    } catch (Exception e) {
      e.printStackTrace();
      log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
      grantedQoSList.add(FAILURE.value());
    }
  }

  ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
 

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
   try {
      if (stopped) {
         disconnect(true);
         return;
      }

      MqttMessage message = (MqttMessage) msg;

      // Disconnect if Netty codec failed to decode the stream.
      if (message.decoderResult().isFailure()) {
         log.debug("Bad Message Disconnecting Client.");
         disconnect(true);
         return;
      }

      connection.dataReceived();

      if (AuditLogger.isAnyLoggingEnabled()) {
         AuditLogger.setRemoteAddress(connection.getRemoteAddress());
      }

      MQTTUtil.logMessage(session.getState(), message, true);

      if (this.protocolManager.invokeIncoming(message, this.connection) != null) {
         log.debugf("Interceptor rejected MQTT message: %s", message);
         disconnect(true);
         return;
      }

      switch (message.fixedHeader().messageType()) {
         case CONNECT:
            handleConnect((MqttConnectMessage) message, ctx);
            break;
         case PUBLISH:
            handlePublish((MqttPublishMessage) message);
            break;
         case PUBACK:
            handlePuback((MqttPubAckMessage) message);
            break;
         case PUBREC:
            handlePubrec(message);
            break;
         case PUBREL:
            handlePubrel(message);
            break;
         case PUBCOMP:
            handlePubcomp(message);
            break;
         case SUBSCRIBE:
            handleSubscribe((MqttSubscribeMessage) message);
            break;
         case UNSUBSCRIBE:
            handleUnsubscribe((MqttUnsubscribeMessage) message);
            break;
         case PINGREQ:
            handlePingreq();
            break;
         case DISCONNECT:
            disconnect(false);
            break;
         case UNSUBACK:
         case SUBACK:
         case PINGRESP:
         case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
         default:
            disconnect(true);
      }
   } catch (Exception e) {
      log.debug("Error processing Control Packet, Disconnecting Client", e);
      disconnect(true);
   } finally {
      ReferenceCountUtil.release(msg);
   }
}
 
 类所在包
 类方法
 同包方法