类 io.netty.handler.codec.mqtt.MqttPublishMessage 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttPublishMessage 的API类实例代码及写法,或者点击链接到github查看源代码。


public static void convertToMsg(SessionMsgType type, MqttMessage inbound) throws AdaptorException {
    switch (type) {
        case POST_TELEMETRY_REQUEST:
            convertToTelemetryUploadRequest( (MqttPublishMessage) inbound);
            break;
        case POST_ATTRIBUTES_REQUEST:
            convertToUpdateAttributesRequest((MqttPublishMessage) inbound);
            break;
        case SUBSCRIBE_ATTRIBUTES_REQUEST:
            System.out.println("{\"key1\":\"value1\"}");
            break;
        case GET_ATTRIBUTES_REQUEST:
            convertToGetAttributesRequest((MqttPublishMessage) inbound);
            break;
    }
}
 

private static void convertToGetAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
    try {
        String payload = inbound.payload().toString(UTF8);
        JsonElement requestBody = new JsonParser().parse(payload);
        Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
        Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
        if (clientKeys == null && sharedKeys == null) {
        } else {
            for (String clientKey : clientKeys) {
                System.out.print("客户端属性:" + clientKey +" ");
            }
            for (String sharedKey : sharedKeys) {
                System.out.print("共享设备属性:" + sharedKey + " ");
            }
        }
    }catch (RuntimeException e) {
        throw new AdaptorException(e);
    }
}
 
源代码3 项目: iotplatform   文件: JsonMqttAdaptor.java

private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound)
    throws AdaptorException {
  String topicName = inbound.variableHeader().topicName();
  try {
    Integer requestId = Integer
        .valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
    String payload = inbound.payload().toString(UTF8);
    JsonElement requestBody = new JsonParser().parse(payload);
    Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
    Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
    if (clientKeys == null && sharedKeys == null) {
      return new BasicGetAttributesRequest(requestId);
    } else {
      return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
    }
  } catch (RuntimeException e) {
    log.warn("Failed to decode get attributes request", e);
    throw new AdaptorException(e);
  }
}
 

private static void convertToGetAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
    try {
        String payload = inbound.payload().toString(UTF8);
        JsonElement requestBody = new JsonParser().parse(payload);
        Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
        Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
        if (clientKeys == null && sharedKeys == null) {
        } else {
            for (String clientKey : clientKeys) {
                System.out.print("客户端属性:" + clientKey +" ");
            }
            for (String sharedKey : sharedKeys) {
                System.out.print("共享设备属性:" + sharedKey + " ");
            }
        }
    }catch (RuntimeException e) {
        throw new AdaptorException(e);
    }
}
 
源代码5 项目: WeEvent   文件: Publish.java

@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
    MqttPublishMessage msg = (MqttPublishMessage) req;
    log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());

    switch (msg.fixedHeader().qosLevel()) {
        case AT_MOST_ONCE: {
            this.sessionStore.publishMessage(msg, false);
            return Optional.empty();
        }

        case AT_LEAST_ONCE: {
            boolean result = this.sessionStore.publishMessage(msg, false);
            MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
            MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
                    MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
            return Optional.of(rsp);
        }

        case EXACTLY_ONCE:
        default: {
            log.error("DOT NOT support Qos=2, close");
            throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
        }
    }
}
 
源代码6 项目: WeEvent   文件: SessionStore.java

public boolean publishMessage(MqttPublishMessage msg, boolean will) {
    byte[] messageBytes = new byte[msg.payload().readableBytes()];
    msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
    Map<String, String> extensions = new HashMap<>();
    if (will) {
        extensions.put(WeEventConstants.EXTENSIONS_WILL_MESSAGE, WeEventConstants.EXTENSIONS_WILL_MESSAGE);
    }

    try {
        WeEvent event = new WeEvent(msg.variableHeader().topicName(), messageBytes, extensions);
        SendResult sendResult = this.producer.publish(event, "", this.timeout);
        return sendResult.getStatus() == SendResult.SendResultStatus.SUCCESS;
    } catch (BrokerException e) {
        log.error("exception in publish, {}", e.toString());
        return false;
    }
}
 
源代码7 项目: cassandana   文件: BrokerInterceptor.java

@Override
public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
    msg.retain();

    executor.execute(() -> {
            try {
                int messageId = msg.variableHeader().packetId();//messageId();
                String topic = msg.variableHeader().topicName();
                for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                    LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
                            + "interceptorId={}", clientID, messageId, topic, handler.getID());
                    handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
    });
}
 

private void sendSubscribMessage(String clientId, Channel channel, MqttQoS respQoS, boolean bSavePubMsg,
		boolean bSend, BorkerMessage bMsgInfo) {

	if ((!bSavePubMsg) && (!bSend)) {
		return ;
	}
	
	String topicName = bMsgInfo.getTopicName();
	boolean isDup = bMsgInfo.isDup();
	boolean isRetain = bMsgInfo.isRetain();
	byte[] msgBytes = bMsgInfo.getMsgBytes();
	int bMsgId =  getMustSendMessageId(respQoS, clientId); 

	if (bSavePubMsg) {
		ConsumerMessage pubMsgObj = ConsumerMessage.builder().sourceClientId(clientId).topic(topicName)
				.mqttQoS(respQoS.value()).messageBytes(msgBytes).messageId(bMsgId).build();
		consumerData.putPublishMessage(clientId, pubMsgObj);
	}

	if (bSend && channel != null) {
		MqttPublishMessage publishMessage = ProtocolUtil.publishMessage(topicName, isDup, respQoS.value(), isRetain,
				bMsgId, msgBytes);
		
		this.sendProcess.sendPublishMessage(channel, publishMessage);
	}
}
 
源代码9 项目: iot-mqtt   文件: ReSendMessageService.java

public boolean dispatcherMessage(String clientId, Message message) {
	ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
	// client off line again
	if (clientSession == null) {
		log.warn("The client offline again, put the message to the offline queue,clientId:{}", clientId);
		return false;
	}
	int qos = (int) message.getHeader(MessageHeader.QOS);
	int messageId = message.getMsgId();
	if (qos > 0) {
		flowMessageStore.cacheSendMsg(clientId, message);
	}
	MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId);
	clientSession.getCtx().writeAndFlush(publishMessage);
	return true;
}
 
源代码10 项目: jmqtt   文件: ReSendMessageService.java

public boolean dispatcherMessage(String clientId, Message message){
    ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
    // client off line again
    if(clientSession == null){
        log.warn("The client offline again, put the message to the offline queue,clientId:{}",clientId);
        return false;
    }
    int qos = (int) message.getHeader(MessageHeader.QOS);
    int messageId = message.getMsgId();
    if(qos > 0){
        flowMessageStore.cacheSendMsg(clientId,message);
    }
    MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message,false,qos,messageId);
    clientSession.getCtx().writeAndFlush(publishMessage);
    return true;
}
 

@Override
public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
    msg.retain();

    executor.execute(() -> {
            try {
                int messageId = msg.variableHeader().messageId();
                String topic = msg.variableHeader().topicName();
                for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                    LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
                            + "interceptorId={}", clientID, messageId, topic, handler.getID());
                    handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                }
            } finally {
                msg.release();
            }
    });
}
 
源代码12 项目: Groza   文件: GatewaySessionCtx.java

public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
    JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
    int requestId = mqttMsg.variableHeader().messageId();
    if (json.isJsonObject()) {
        JsonObject jsonObj = json.getAsJsonObject();
        for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
            String deviceName = checkDeviceConnected(deviceEntry.getKey());
            if (!deviceEntry.getValue().isJsonArray()) {
                throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
            }
            BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
            JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
            for (JsonElement element : deviceData) {
                JsonConverter.parseWithTs(request, element.getAsJsonObject());
            }
            GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
        }
    } else {
        throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
    }
}
 
源代码13 项目: Groza   文件: GatewaySessionCtx.java

public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
    JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
    int requestId = mqttMsg.variableHeader().messageId();
    if (json.isJsonObject()) {
        JsonObject jsonObj = json.getAsJsonObject();
        for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
            String deviceName = checkDeviceConnected(deviceEntry.getKey());
            if (!deviceEntry.getValue().isJsonObject()) {
                throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
            }
            long ts = System.currentTimeMillis();
            BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
            JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
            request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
            GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
        }
    } else {
        throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
    }
}
 

public   MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
    this.messageId = messageId;
    this.future = future;
    this.payload = payload;
    this.message = message;
    this.qos = qos;

    this.publishRetransmissionHandler.setOriginalMessage(message);
}
 
源代码15 项目: iotplatform   文件: MqttTransportHandler.java

private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
  if (!checkConnected(ctx)) {
    return;
  }
  String topicName = mqttMsg.variableHeader().topicName();
  int msgId = mqttMsg.variableHeader().messageId();
  log.info("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);

  if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
    // if (gatewaySessionCtx != null) {
    // gatewaySessionCtx.setChannel(ctx);
    // try {
    // if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
    // gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
    // } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
    // gatewaySessionCtx.onDeviceAttributes(mqttMsg);
    // } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) {
    // gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
    // } else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
    // gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
    // } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
    // gatewaySessionCtx.onDeviceConnect(mqttMsg);
    // } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
    // gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
    // }
    // } catch (RuntimeException | AdaptorException e) {
    // log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId,
    // topicName, msgId, e);
    // }
    // }
  } else {
    processDevicePublish(ctx, mqttMsg, topicName, msgId);
  }
}
 

private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound) throws AdaptorException {
    String payload = validatePayload(inbound.payload());
    try {
        Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
        for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
            System.out.println("key= " + entry.getKey());
            for (KvEntry kvEntry: entry.getValue()) {
                System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
            }
        }
    } catch (IllegalStateException | JsonSyntaxException ex) {
        throw new AdaptorException(ex);
    }
}
 

private static void convertToUpdateAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
    String payload = validatePayload(inbound.payload());
    try {
        Set<AttributeKvEntry> attributeKvEntrySet =  JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getAttributes();
        for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
            System.out.println("属性名="+attributeKvEntry.getKey()+" 属性值="+attributeKvEntry.getValueAsString());
        }
    } catch (IllegalStateException | JsonSyntaxException ex) {
        throw new AdaptorException(ex);
    }
}
 

private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound) throws AdaptorException {
    String payload = validatePayload(inbound.payload());
    try {
        Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
        for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
            System.out.println("key= " + entry.getKey());
            for (KvEntry kvEntry: entry.getValue()) {
                System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
            }
        }
    } catch (IllegalStateException | JsonSyntaxException ex) {
        throw new AdaptorException(ex);
    }
}
 
源代码19 项目: blynk-server   文件: MqttHardwareHandler.java

@Override
public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {
    this.stats.incrementMqttStat();
    MqttMessageType messageType = msg.fixedHeader().messageType();

    switch (messageType) {
        case PUBLISH :
            MqttPublishMessage publishMessage = (MqttPublishMessage) msg;
            String topic = publishMessage.variableHeader().topicName();

            switch (topic.toLowerCase()) {
                case "hardware" :
                    hardware.messageReceived(state, publishMessage);
                    break;
            }

            break;

        case PINGREQ :
            ctx.writeAndFlush(
                    MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null),
                    ctx.voidPromise());
            break;

        case DISCONNECT :
            log.trace("Got disconnect. Closing...");
            ctx.close();
            break;
    }
}
 
源代码20 项目: WeEvent   文件: SessionContext.java

public SessionContext(String sessionId, String clientId, WebSocketSession session, boolean cleanSession, MqttPublishMessage willMessage) {
    this.sessionId = sessionId;
    this.clientId = clientId;
    this.cleanSession = cleanSession;
    this.willMessage = willMessage;

    this.channel = null;
    this.session = session;
}
 
源代码21 项目: iotplatform   文件: JsonMqttAdaptor.java

private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound)
    throws AdaptorException {
  String payload = validatePayload(ctx.getSessionId(), inbound.payload());
  try {
    return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
  } catch (IllegalStateException | JsonSyntaxException ex) {
    throw new AdaptorException(ex);
  }
}
 
源代码22 项目: iotplatform   文件: JsonMqttAdaptor.java

private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
  MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
      0);
  MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
  ByteBuf payload = ALLOCATOR.buffer();
  payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
  return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
 
源代码23 项目: lannister   文件: MqttPacketReceiver.java

@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
	switch (msg.fixedHeader().messageType()) {
	case PUBLISH:
		if (receiver != null) {
			receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg));
		}

		int messageId = ((MqttPublishMessage) msg).variableHeader().messageId();
		if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
			client.send(MqttMessageFactory.puback(messageId));
		}
		else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
			client.send(MqttMessageFactory.pubrec(messageId));
		}
		break;

	case CONNACK:
		sharedObject.receivedMessage(msg);

		synchronized (sharedObject.locker()) {
			sharedObject.locker().notify();
		}
		break;

	case PUBREC:
		client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId()));
		break;

	case SUBACK:
	case PUBACK:
	case PUBCOMP:
	default:
		break;
	}
}
 

@Override
public void retain(Topic topic, MqttPublishMessage msg) {
    final ByteBuf payload = msg.content();
    byte[] rawPayload = new byte[payload.readableBytes()];
    payload.getBytes(0, rawPayload);
    final RetainedMessage toStore = new RetainedMessage(msg.fixedHeader().qosLevel(), rawPayload);
    storage.put(topic, toStore);
}
 
源代码25 项目: activemq-artemis   文件: MQTTTestSupport.java

@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
   if (packet.getClass() == MqttPublishMessage.class) {
      messageCount++;
   }
   return true;
}
 

@Override
public MqttPublishMessage getWillMessage(String clientId) {
	MqttSession session = localCache.get(clientId);
	if (null != session) {
		return session.getWillMessage();
	} else {
		return null;
	}
}
 
源代码27 项目: ext-opensource-netty   文件: MqttSession.java

public MqttSession(String clientId, Channel channel, boolean cleanSession, MqttPublishMessage willMessage) {
	super(channel);
	
	this.clientId = clientId;
	this.cleanSession = cleanSession;
	this.willMessage = willMessage;
}
 

/**
 * B - S(Qos0, Qos1, Qos2)
 * @param channel
 * @param mqttMessage
 */
public void processPublish(Channel channel, MqttPublishMessage mqttMessage) {
	MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
	MqttPublishVariableHeader mqttPublishVariableHeader = mqttMessage.variableHeader();
	ByteBuf payload = mqttMessage.payload();
	String topciName = mqttPublishVariableHeader.topicName();
	MqttQoS qosLevel = mqttFixedHeader.qosLevel();
	int messageId = mqttPublishVariableHeader.packetId();
	byte[] bytes = ByteBufUtil.copyByteBuf(payload); 

	MessageData recviceMessage = MessageData.builder().topic(topciName).qos(qosLevel.value())
			.messageId(messageId).payload(bytes)
			.status(MessageStatus.PUB)
			.dup(mqttFixedHeader.isDup())
			.retained(mqttFixedHeader.isRetain()).build();

	if (qosLevel == MqttQoS.EXACTLY_ONCE) {
		this.consumerProcess.saveMesage(recviceMessage);
	}

	this.consumerProcess.processPublish(recviceMessage);

	switch (qosLevel) {
	case AT_MOST_ONCE:
		break;
	case AT_LEAST_ONCE:
		this.consumerProcess.sendPubAckMessage(messageId);
		break;
	case EXACTLY_ONCE:
		this.consumerProcess.sendPubRecMessage(messageId);
		break;
	default:
		break;
	}
}
 

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
	if (msgx == null) {return ;}
	MqttMessage msg = (MqttMessage) msgx;
	NettyLog.debug("read: {}", msg.fixedHeader().messageType());
	MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
	switch (mqttFixedHeader.messageType()) {
	case CONNACK:
		clientProtocolProcess.processConnectBack(ctx.channel(), (MqttConnAckMessage) msg);
		break;
	case UNSUBACK:
		clientProtocolProcess.processUnSubBack(ctx.channel(), msg);
		break;
	case PUBLISH:
		clientProtocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
		break;
	case PUBACK:
		clientProtocolProcess.processPubAck(ctx.channel(), msg);
		break;
	case PUBREC:
		clientProtocolProcess.processPubRec(ctx.channel(), msg);
		break;
	case PUBREL:
		clientProtocolProcess.processPubRel(ctx.channel(), msg);
		break;
	case PUBCOMP:
		clientProtocolProcess.processPubComp(ctx.channel(), msg);
		break;
	case SUBACK:
		clientProtocolProcess.processSubAck(ctx.channel(), (MqttSubAckMessage) msg);
		break;
	default:
		break;
	}
}
 
源代码30 项目: iot-mqtt   文件: DefaultDispatcherMessage.java

@Override
public void run() {
    if (Objects.nonNull(messages)) {
        try {
            for (Message message : messages) {
                Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
                for (Subscription subscription : subscriptions) {
                    String clientId = subscription.getClientId();
                    ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId());
                    if (ConnectManager.getInstance().containClient(clientId)) {
                        int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos());
                        message.putHeader(MessageHeader.QOS, qos);
                        if (qos > 0) {
                            flowMessageStore.cacheSendMsg(clientId, message);
                        }
                        MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, message.getMsgId());
                        clientSession.getCtx().writeAndFlush(publishMessage);
                        clientSession.addReceiveIdCounter();//收到一条
                    } else {
                        offlineMessageStore.addOfflineMessage(clientId, message);
                    }
                }
            }
        } catch (Exception ex) {
            log.warn("Dispatcher message failure,cause={}", ex);
        }
    }
}
 
 类所在包
 类方法
 同包方法