下面列出了怎么用 io.netty.handler.codec.mqtt.MqttMessageFactory 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
MqttPublishMessage msg = (MqttPublishMessage) req;
log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE: {
this.sessionStore.publishMessage(msg, false);
return Optional.empty();
}
case AT_LEAST_ONCE: {
boolean result = this.sessionStore.publishMessage(msg, false);
MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
return Optional.of(rsp);
}
case EXACTLY_ONCE:
default: {
log.error("DOT NOT support Qos=2, close");
throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
}
}
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) req;
log.info("UNSUBSCRIBE, {}", msg.payload().topics());
if (msg.payload().topics().isEmpty()) {
log.error("empty topic, skip it");
return Optional.empty();
}
this.sessionStore.unSubscribe(clientId, msg.payload().topics());
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
return Optional.of(rsp);
}
private void sendPubAck(Channel client, Integer packageID) {
if (LOG.isDebugEnabled()) {
LOG.debug("发送PubAck消息给客户端");
}
try {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
client.writeAndFlush(pubAckMessage);
} catch (Throwable th) {
LOG.error("Send pubAck error!", th);
client.close().addListener(CLOSE_ON_FAILURE);
}
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
log.debug("heart beat from client: {}", clientId);
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_LEAST_ONCE, false, 0), null, null);
return Optional.of(rsp);
}
public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
String clientId = sessionData.getClientId();
if (StringUtils.isBlank(clientId)) {
log.error("clientId is empty, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
}
// verify userName and password
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
if (!this.authService.verifyUserName(username, password)) {
log.error("verify account failed, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
}
if (this.sessionStore.existSession(clientId)) {
log.info("exist client id, force to delete the older");
this.sessionStore.removeSession(clientId);
}
// store new session
this.sessionStore.addSession(clientId, sessionData);
log.info("MQTT connected, clientId: {}", clientId);
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttConnAckVariableHeader(ackCode, sessionPresent),
null);
client.writeAndFlush(connAckMessage);
return connAckMessage;
}
private void sendUnSubAck(Channel client, int packageID, MqttQoS qoS) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, qoS, false, 0);
MqttMessage unSubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
LOG.info("UNSUBSCRIBE successful, packageID: {}", packageID);
client.writeAndFlush(unSubAckMessage);
}
private MqttMessage createConnectPacket(MqttClientOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null
);
return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {
this.stats.incrementMqttStat();
MqttMessageType messageType = msg.fixedHeader().messageType();
switch (messageType) {
case PUBLISH :
MqttPublishMessage publishMessage = (MqttPublishMessage) msg;
String topic = publishMessage.variableHeader().topicName();
switch (topic.toLowerCase()) {
case "hardware" :
hardware.messageReceived(state, publishMessage);
break;
}
break;
case PINGREQ :
ctx.writeAndFlush(
MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null),
ctx.voidPromise());
break;
case DISCONNECT :
log.trace("Got disconnect. Closing...");
ctx.close();
break;
}
}
private MqttMessage genSubAck(int messageId, List<Integer> mqttQoSList) {
return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId + mqttQoSList.size()),
MqttMessageIdVariableHeader.from(messageId),
new MqttSubAckPayload(mqttQoSList));
}
private void sendEvent(String clientId, String subscriptionId, WeEvent event, int messageId) {
ByteBuf payload = Unpooled.buffer();
int payloadSize;
try {
byte[] content = JsonHelper.object2JsonBytes(event.getContent());
payloadSize = content.length;
payload.writeBytes(content);
} catch (BrokerException e) {
log.error("json encode failed, {}", e.toString());
return;
}
int finalPayloadSize = payloadSize;
this.getSession(clientId).ifPresent(context -> {
log.debug("subscription list in session context, {}", context.getSubscribeDataList());
context.getSubscribeDataList()
.stream()
.filter(item -> item.getSubscriptionId().equals(subscriptionId))
.findFirst()
.ifPresent(subscribe -> {
switch (subscribe.getMqttQoS()) {
case AT_MOST_ONCE:
case AT_LEAST_ONCE: {
log.info("PUBLISH subscribe message to client, client id: {} {} {}", clientId, messageId, event);
// update offset
subscribe.setOffset(event.getEventId());
int remaining = ProtocolProcess.fixLengthOfMessageId + subscribe.getTopic().length() + finalPayloadSize;
//subscribe.getTopic() may be contain wildcard, use original topic in WeEvent
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, subscribe.getMqttQoS(), false, remaining),
new MqttPublishVariableHeader(event.getTopic(), messageId), payload);
context.sendRemote(rsp);
}
break;
case EXACTLY_ONCE:
default:
log.error("DOT NOT support Qos=2");
break;
}
});
});
}
public static MqttUnsubAckMessage unsubAckMessage(int messageId) {
return (MqttUnsubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttSubAckMessage subAckMessage(int messageId, List<Integer> mqttQoSList) {
return (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), new MqttSubAckPayload(mqttQoSList));
}
public static MqttMessage pubCompMessage(int messageId) {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttPubAckMessage pubAckMessage(int messageId) {
return (MqttPubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pubRecMessage(int messageId) {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pubRelMessage(int messageId, boolean isDup) {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, isDup, MqttQoS.AT_LEAST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
}
public static MqttMessage pingRespMessage() {
return MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0), null, null);
}
public static MqttConnAckMessage connAckMessage(MqttConnectReturnCode code, boolean sessionPresent) {
return (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(code, sessionPresent), null);
}
public static MqttPublishMessage publishMessage(String topicName, boolean isDup, int qosValue, boolean isRetain,
int messageId, byte[] payload) {
return (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, MqttQoS.valueOf(qosValue), isRetain, 0),
new MqttPublishVariableHeader(topicName, messageId), Unpooled.buffer().writeBytes(payload));
}
public void publish2Subscriber(String name, String clientID, MqttSession session, Consumer consumer, int qos) throws Exception {
PullResult result = consume.getMessage(
consumer,
1,
1000 * 60 * 2
);
String topicName = result.getTopic();
List<ByteBuffer> buffers = result.getBuffers();
if (buffers != null && buffers.size() > 0) {
BrokerMessage brokerMessage = Serializer.readBrokerMessage(buffers.get(0));
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
false,
MqttQoS.valueOf(MqttMessageSerializer.getLowerQos(MqttMessageSerializer.readExtension(brokerMessage), qos)),
false,
0
);
int packageId = session.getMessageAcknowledgedZone().acquireAcknowledgedPosition(brokerMessage);
MqttPublishMessage publishMsg = (MqttPublishMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttPublishVariableHeader(topicName, packageId),
Unpooled.wrappedBuffer(brokerMessage.getByteBody()));
boolean isActive = connectionManager.isConnected(clientID);
if (isActive) {
MqttConnection connection = connectionManager.getConnection(clientID);
Channel channel = connection.getChannel();
if (channel.isActive() && channel.isOpen()) {
channel.writeAndFlush(publishMsg).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("推送消息成功: {}", publishMsg);
}
} else {
LOG.error("publish message error, thread: <{}>, clientID: <{}>, message: <{}>, cause: <{}>", name, clientID, brokerMessage, channelFuture.cause());
throw new Exception(channelFuture.cause());
}
});
}
}
}
}
public void processSubscribe(Channel client, MqttSubscribeMessage subscribeMessage) {
List<Integer> resultCodes = new ArrayList<>();
String clientID = NettyAttrManager.getAttrClientId(client);
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
if (LOG.isDebugEnabled()) {
LOG.debug("处理subscribe数据包, clientID: {}, cleanSession: {}", clientID, connection.isCleanSession());
}
List<MqttTopicSubscription> topicSubscribes = subscribeMessage.payload().topicSubscriptions();
LOG.info("Subscribe topics: {}, clientID: {}", topicSubscribes, clientID);
try {
if (null != topicSubscribes) {
Set<MqttSubscription> topicFilters = subscribe(topicSubscribes, clientID, connection.getClientGroupName(), resultCodes);
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : topicFilters) {
session.addSubscription(subscription);
}
}
} else {
// The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
LOG.error("subscribe is error!");
if (resultCodes.size() < topicSubscribes.size()) {
int minus = topicSubscribes.size() - resultCodes.size();
for (; minus > 0; minus--) {
resultCodes.add(MqttQoS.FAILURE.value());
}
}
}
}
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(subscribeMessage.variableHeader().messageId()),
new MqttSubAckPayload(resultCodes));
LOG.info("SUBSCRIBE successful, subscribe result: {}", resultCodes);
client.writeAndFlush(subAckMessage);
}
/**
* See {@link MqttClient#ping()} for more details
*/
@Override
public MqttClient ping() {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null);
this.write(pingreq);
return this;
}
/**
* Sends PUBACK packet to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
}
/**
* Sends PUBCOMP packet to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishComplete(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(pubcomp);
}
public MqttEndpointImpl subscribeAcknowledge(int subscribeMessageId, List<MqttQoS> grantedQoSLevels) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(subscribeMessageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels.stream().mapToInt(MqttQoS::value).toArray());
io.netty.handler.codec.mqtt.MqttMessage suback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(suback);
return this;
}
public MqttEndpointImpl unsubscribeAcknowledge(int unsubscribeMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(unsubscribeMessageId);
io.netty.handler.codec.mqtt.MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(unsuback);
return this;
}