下面列出了 io.netty.handler.codec.mqtt.MqttMessageFactory # newMessage ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) req;
log.info("UNSUBSCRIBE, {}", msg.payload().topics());
if (msg.payload().topics().isEmpty()) {
log.error("empty topic, skip it");
return Optional.empty();
}
this.sessionStore.unSubscribe(clientId, msg.payload().topics());
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
return Optional.of(rsp);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
private void sendPubAck(Channel client, Integer packageID) {
if (LOG.isDebugEnabled()) {
LOG.debug("发送PubAck消息给客户端");
}
try {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
client.writeAndFlush(pubAckMessage);
} catch (Throwable th) {
LOG.error("Send pubAck error!", th);
client.close().addListener(CLOSE_ON_FAILURE);
}
}
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
log.debug("heart beat from client: {}", clientId);
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_LEAST_ONCE, false, 0), null, null);
return Optional.of(rsp);
}
private void sendUnSubAck(Channel client, int packageID, MqttQoS qoS) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, qoS, false, 0);
MqttMessage unSubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
LOG.info("UNSUBSCRIBE successful, packageID: {}", packageID);
client.writeAndFlush(unSubAckMessage);
}
private MqttMessage genSubAck(int messageId, List<Integer> mqttQoSList) {
return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId + mqttQoSList.size()),
MqttMessageIdVariableHeader.from(messageId),
new MqttSubAckPayload(mqttQoSList));
}
public static MqttUnsubAckMessage unsubAckMessage(int messageId) {
return (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pubCompMessage(int messageId) {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pubRelMessage(int messageId, boolean isDup) {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, isDup, MqttQoS.AT_LEAST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pingRespMessage() {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0), null, null);
}
public static MqttConnAckMessage connAckMessage(MqttConnectReturnCode code, boolean sessionPresent) {
return (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(code, sessionPresent), null);
}
public static MqttPublishMessage publishMessage(String topicName, boolean isDup, int qosValue, boolean isRetain,
int messageId, byte[] payload) {
return (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, MqttQoS.valueOf(qosValue), isRetain, 0),
new MqttPublishVariableHeader(topicName, messageId), Unpooled.buffer().writeBytes(payload));
}
public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
List<Integer> resultCodes = new ArrayList<>();
String clientID = NettyAttrManager.getAttrClientId(client);
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
if (LOG.isDebugEnabled()) {
LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
}
List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);
try {
if (null != topicSubscribes) {
Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : topicFilters) {
session.addSubscription(subscription);
}
}
} else {
// The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
LOG.error("subscribe is error!");
if (resultCodes.size() < topicSubscribes.size()) {
int minus = topicSubscribes.size() - resultCodes.size();
for (; minus > 0; minus--) {
resultCodes.add(MqttQoS.FAILURE.value());
}
}
}
}
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
new MqttSubAckPayload(resultCodes));
LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
client.writeAndFlush(subAckMessage);
}
public MqttEndpointImpl publishReceived(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(pubrec);
return this;
}
public MqttEndpointImpl subscribeAcknowledge(int subscribeMessageId, List<MqttQoS> grantedQoSLevels) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(subscribeMessageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels.stream().mapToInt(MqttQoS::value).toArray());
io.netty.handler.codec.mqtt.MqttMessage suback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(suback);
return this;
}
/**
* Sends PUBACK packet to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
}
public MqttEndpointImpl pong() {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
io.netty.handler.codec.mqtt.MqttMessage pingresp = MqttMessageFactory.newMessage(fixedHeader, null, null);
this.write(pingresp);
return this;
}
public MqttEndpointImpl publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
return this;
}