下面列出了怎么用 io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
MqttPublishMessage msg = (MqttPublishMessage) req;
log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE: {
this.sessionStore.publishMessage(msg, false);
return Optional.empty();
}
case AT_LEAST_ONCE: {
boolean result = this.sessionStore.publishMessage(msg, false);
MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
return Optional.of(rsp);
}
case EXACTLY_ONCE:
default: {
log.error("DOT NOT support Qos=2, close");
throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
}
}
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) req;
log.info("UNSUBSCRIBE, {}", msg.payload().topics());
if (msg.payload().topics().isEmpty()) {
log.error("empty topic, skip it");
return Optional.empty();
}
this.sessionStore.unSubscribe(clientId, msg.payload().topics());
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
return Optional.of(rsp);
}
/**
* P - B (Qos2)
* @param channel
* @param variableHeader
*/
public void processPubRel(Channel channel, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
ProcedureMessage info = this.procedureProcess.processPubRel(channel, messageId);
if (info != null) {
NettyLog.debug("relInfo:" + info);
BorkerMessage bMsgInfo = BorkerMessage.builder().sourceClientId(info.getSourceClientId())
.sourceMsgId(info.getSourceMsgId()).topicName(info.getTopicName()).iQosLevel(info.getIQosLevel())
.msgBytes(info.getMsgBytes()).retain(false).build();
List<SubscribeTopicInfo> subscribeClientList = this.topicProcess.search(bMsgInfo.getTopicName());
this.consumerProcess.sendSubscribMessage(bMsgInfo, subscribeClientList);
}
this.sendProcess.sendPubCompMessage(channel, messageId);
}
private void sendPubAck(Channel client, Integer packageID) {
if (LOG.isDebugEnabled()) {
LOG.debug("发送PubAck消息给客户端");
}
try {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
client.writeAndFlush(pubAckMessage);
} catch (Throwable th) {
LOG.error("Send pubAck error!", th);
client.close().addListener(CLOSE_ON_FAILURE);
}
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
/**
* B - P (Qos1)
* @param channel
* @param mqttMessage
*/
public void processPubAck(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
int messageId = messageIdVariableHeader.messageId();
producerProcess.processPubAck(messageId);
producerProcess.delMessage(messageId);
}
/**
* B- P(Qos2)
* @param channel
* @param mqttMessage
*/
public void processPubRec(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
int messageId = messageIdVariableHeader.messageId();
producerProcess.processPubRec(messageId);
producerProcess.sendPubRel(messageId);
}
/**
* B - P (Qos2)
* @param channel
* @param mqttMessage
*/
public void processPubComp(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
int messageId = messageIdVariableHeader.messageId();
producerProcess.processPubComp(messageId);
producerProcess.delMessage(messageId);
}
/**
* B - S(Qos2)
* @param channel
* @param mqttMessage
*/
public void processPubRel(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage
.variableHeader();
int messageId = messageIdVariableHeader.messageId();
this.consumerProcess.processPubRel(messageId);
this.consumerProcess.sendPubCompMessage(messageId);
this.consumerProcess.delMesage(messageId);
}
/**
* B - S
* @param channel
* @param mqttMessage
*/
public void processUnSubBack(Channel channel, MqttMessage mqttMessage) {
int messageId;
if (mqttMessage instanceof MqttUnsubAckMessage) {
MqttUnsubAckMessage mqttUnsubAckMessage = (MqttUnsubAckMessage) mqttMessage;
messageId = mqttUnsubAckMessage.variableHeader().messageId();
} else {
MqttMessageIdVariableHeader o = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
messageId = o.messageId();
NettyLog.error("not UnsubAckMessage:{}", messageId);
}
this.consumerProcess.processUnSubBack(messageId);
}
public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
int messageId) {
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
public static MqttUnsubscribeMessage unSubscribeMessage(List<String> topic, int messageId) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_MOST_ONCE,
false, 0x02);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubscribePayload mqttUnsubscribeMessage = new MqttUnsubscribePayload(topic);
return new MqttUnsubscribeMessage(mqttFixedHeader, variableHeader, mqttUnsubscribeMessage);
}
public void processPubAck(Channel client, MqttPubAckMessage pubAckMessage) {
MqttMessageIdVariableHeader pubAckVariableMessage = pubAckMessage.variableHeader();
short packageId = (short) pubAckVariableMessage.messageId();
String clientId = NettyAttrManager.getAttrClientId(client);
consumerManager.acknowledge(clientId, packageId);
if (LOG.isDebugEnabled()) {
LOG.debug("Received PubAck packageID: {}" + packageId);
}
}
private void sendUnSubAck(Channel client, int packageID, MqttQoS qoS) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, qoS, false, 0);
MqttMessage unSubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
LOG.info("UNSUBSCRIBE successful, packageID: {}", packageID);
client.writeAndFlush(unSubAckMessage);
}
public static MqttPubAckMessage puback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttPubAckMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrec(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrel(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubcomp(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
int topicNameSize = 0;
int topicCount = topicSubscriptions.length;
for (MqttTopicSubscription item : topicSubscriptions) {
topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 2 + topicNameSize + topicCount);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2 + grantedQoSLevels.size());
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
public static MqttUnsubAckMessage unsuback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
switch (msg.fixedHeader().messageType()) {
case PUBLISH:
if (receiver != null) {
receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg));
}
int messageId = ((MqttPublishMessage) msg).variableHeader().messageId();
if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
client.send(MqttMessageFactory.puback(messageId));
}
else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
client.send(MqttMessageFactory.pubrec(messageId));
}
break;
case CONNACK:
sharedObject.receivedMessage(msg);
synchronized (sharedObject.locker()) {
sharedObject.locker().notify();
}
break;
case PUBREC:
client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId()));
break;
case SUBACK:
case PUBACK:
case PUBCOMP:
default:
break;
}
}
void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) {
MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
sendToClient(rel);
}
private MqttMessage genSubAck(int messageId, List<Integer> mqttQoSList) {
return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId + mqttQoSList.size()),
MqttMessageIdVariableHeader.from(messageId),
new MqttSubAckPayload(mqttQoSList));
}
public static int messageId(MqttMessage msg) {
return ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
}
/**
* S - B (Qos2)
* @param channel
* @param variableHeader
*/
public void processPubRec(Channel channel, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
this.consumerProcess.processPubRec(channel, messageId);
this.sendProcess.sendPubRelMessage(channel, messageId, false);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (!msg.decoderResult().isSuccess()) {
NettyLog.error("error decoder");
ctx.close();
return;
}
NettyLog.debug("read: " + msg.fixedHeader().messageType());
if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
} else {
if (!NettyUtil.isLogin(ctx.channel())) {
NettyLog.info("not login");
return ;
}
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}
public static MqttUnsubAckMessage unsubAckMessage(int messageId) {
return (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttSubAckMessage subAckMessage(int messageId, List<Integer> mqttQoSList) {
return (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), new MqttSubAckPayload(mqttQoSList));
}