下面列出了怎么用 io.netty.handler.codec.mqtt.MqttFixedHeader 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
MqttPublishMessage msg = (MqttPublishMessage) req;
log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE: {
this.sessionStore.publishMessage(msg, false);
return Optional.empty();
}
case AT_LEAST_ONCE: {
boolean result = this.sessionStore.publishMessage(msg, false);
MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
return Optional.of(rsp);
}
case EXACTLY_ONCE:
default: {
log.error("DOT NOT support Qos=2, close");
throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
}
}
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) req;
log.info("UNSUBSCRIBE, {}", msg.payload().topics());
if (msg.payload().topics().isEmpty()) {
log.error("empty topic, skip it");
return Optional.empty();
}
this.sessionStore.unSubscribe(clientId, msg.payload().topics());
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
return Optional.of(rsp);
}
private void sendPubAck(Channel client, Integer packageID) {
if (LOG.isDebugEnabled()) {
LOG.debug("发送PubAck消息给客户端");
}
try {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
client.writeAndFlush(pubAckMessage);
} catch (Throwable th) {
LOG.error("Send pubAck error!", th);
client.close().addListener(CLOSE_ON_FAILURE);
}
}
public static MqttConnectMessage connect(ConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
options.version().protocolLevel(), options.userName() != null, options.password() != null,
options.will() == null ? false : options.will().isRetain(),
options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
options.cleanSession(), options.keepAliveTimeSeconds());
MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
options.will() == null ? "" : options.will().topicName(),
options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
throws Exception {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
cleanSession, 60);
MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
"password");
MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);
ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;
EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());
channel.writeInbound(msg);
return channel.readOutbound();
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
private void startTimer(EventLoop eventLoop){
this.timer = eventLoop.schedule(() -> {
this.timeout += 5;
MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
handler.accept(fixedHeader, originalMessage);
startTimer(eventLoop);
}, timeout, TimeUnit.SECONDS);
}
private boolean checkMessageProperties(MqttMessage message, Map<String, Object> expectedProperties) {
try {
assertNotNull(message);
assertNotNull(server.getNodeID());
MqttFixedHeader header = message.fixedHeader();
assertNotNull(header.messageType());
assertEquals(header.qosLevel().value(), AT_MOST_ONCE);
assertEquals(header.isRetain(), expectedProperties.get(RETAINED));
} catch (Throwable t) {
collector.addError(t);
}
return true;
}
private void sendPingReq(Channel channel){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
if(this.pingRespTimeout != null){
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}
}
/**
* 5秒钟收不到来自对方的ping,就关闭连接
* @param channel
*/
private void sendPingReq(Channel channel){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
/* if(this.pingRespTimeout == null){
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}*/
}
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
log.debug("heart beat from client: {}", clientId);
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_LEAST_ONCE, false, 0), null, null);
return Optional.of(rsp);
}
public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
String clientId = sessionData.getClientId();
if (StringUtils.isBlank(clientId)) {
log.error("clientId is empty, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
}
// verify userName and password
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
if (!this.authService.verifyUserName(username, password)) {
log.error("verify account failed, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
}
if (this.sessionStore.existSession(clientId)) {
log.info("exist client id, force to delete the older");
this.sessionStore.removeSession(clientId);
}
// store new session
this.sessionStore.addSession(clientId, sessionData);
log.info("MQTT connected, clientId: {}", clientId);
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
public static MqttConnectMessage connectMessage(MqttConnectOptions info) {
MqttVersion verinfo = info.getMqttVersion();
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE,
false, 10);
MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(verinfo.protocolName(),
verinfo.protocolLevel(), info.isHasUserName(), info.isHasPassword(), info.isHasWillRetain(),
info.getWillQos(), info.isHasWillFlag(), info.isHasCleanSession(), info.getKeepAliveTime());
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(info.getClientIdentifier(), info.getWillTopic(),
info.getWillMessage(), info.getUserName(), info.getPassword());
MqttConnectMessage mqttSubscribeMessage = new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader,
mqttConnectPayload);
return mqttSubscribeMessage;
}
/**
* B - S(Qos0, Qos1, Qos2)
* @param channel
* @param mqttMessage
*/
public void processPublish(Channel channel, MqttPublishMessage mqttMessage) {
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttPublishVariableHeader mqttPublishVariableHeader = mqttMessage.variableHeader();
ByteBuf payload = mqttMessage.payload();
String topciName = mqttPublishVariableHeader.topicName();
MqttQoS qosLevel = mqttFixedHeader.qosLevel();
int messageId = mqttPublishVariableHeader.packetId();
byte[] bytes = ByteBufUtil.copyByteBuf(payload);
MessageData recviceMessage = MessageData.builder().topic(topciName).qos(qosLevel.value())
.messageId(messageId).payload(bytes)
.status(MessageStatus.PUB)
.dup(mqttFixedHeader.isDup())
.retained(mqttFixedHeader.isRetain()).build();
if (qosLevel == MqttQoS.EXACTLY_ONCE) {
this.consumerProcess.saveMesage(recviceMessage);
}
this.consumerProcess.processPublish(recviceMessage);
switch (qosLevel) {
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
this.consumerProcess.sendPubAckMessage(messageId);
break;
case EXACTLY_ONCE:
this.consumerProcess.sendPubRecMessage(messageId);
break;
default:
break;
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
if (msgx == null) {return ;}
MqttMessage msg = (MqttMessage) msgx;
NettyLog.debug("read: {}", msg.fixedHeader().messageType());
MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
switch (mqttFixedHeader.messageType()) {
case CONNACK:
clientProtocolProcess.processConnectBack(ctx.channel(), (MqttConnAckMessage) msg);
break;
case UNSUBACK:
clientProtocolProcess.processUnSubBack(ctx.channel(), msg);
break;
case PUBLISH:
clientProtocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
clientProtocolProcess.processPubAck(ctx.channel(), msg);
break;
case PUBREC:
clientProtocolProcess.processPubRec(ctx.channel(), msg);
break;
case PUBREL:
clientProtocolProcess.processPubRel(ctx.channel(), msg);
break;
case PUBCOMP:
clientProtocolProcess.processPubComp(ctx.channel(), msg);
break;
case SUBACK:
clientProtocolProcess.processSubAck(ctx.channel(), (MqttSubAckMessage) msg);
break;
default:
break;
}
}
public static MqttSubscribeMessage subscribeMessage(List<MqttTopicSubscription> mqttTopicSubscriptions,
int messageId) {
MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(mqttTopicSubscriptions);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttSubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubscribePayload);
}
public static MqttUnsubscribeMessage unSubscribeMessage(List<String> topic, int messageId) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_MOST_ONCE,
false, 0x02);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubscribePayload mqttUnsubscribeMessage = new MqttUnsubscribePayload(topic);
return new MqttUnsubscribeMessage(mqttFixedHeader, variableHeader, mqttUnsubscribeMessage);
}
public static MqttPublishMessage getPubMessage(Message message,boolean dup,int qos,int messageId){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,dup,MqttQoS.valueOf(qos),false,0);
MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader((String) message.getHeader(MessageHeader.TOPIC),messageId);
ByteBuf heapBuf;
if(message.getPayload() == null){
heapBuf = Unpooled.EMPTY_BUFFER;
}else{
heapBuf = Unpooled.wrappedBuffer((byte[])message.getPayload());
}
return new MqttPublishMessage(fixedHeader,publishVariableHeader,heapBuf);
}
@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
String clientId = NettyAttrManager.getAttrClientId(client);
if (logger.isDebugEnabled()) {
logger.debug(String.format("PingRequest clientId:%s", clientId));
}
MqttFixedHeader pingHeader = new MqttFixedHeader(
MqttMessageType.PINGRESP,
false,
AT_MOST_ONCE,
false,
0);
MqttMessage pingResp = new MqttMessage(pingHeader);
client.writeAndFlush(pingResp);
}
private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttConnAckVariableHeader(ackCode, sessionPresent),
null);
client.writeAndFlush(connAckMessage);
return connAckMessage;
}
private void sendUnSubAck(Channel client, int packageID, MqttQoS qoS) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, qoS, false, 0);
MqttMessage unSubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
LOG.info("UNSUBSCRIBE successful, packageID: {}", packageID);
client.writeAndFlush(unSubAckMessage);
}
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
if (msg instanceof SessionCloseMsg){
pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
channel.close();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("[{}] Processing msg: {}", sessionId, msg);
if (msg instanceof MqttMessage) {
MqttMessage mqttMessage = (MqttMessage) msg;
MqttFixedHeader fixedHeader = mqttMessage.fixedHeader();
if (fixedHeader != null) {
processMqttMsg(ctx, (MqttMessage) msg);
} else {
//xtx
}
}
}
void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) {
MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
sendToClient(rel);
}
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
if (msg instanceof SessionCloseMsg) {
pushToNetwork(
new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
channel.close();
}
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}