下面列出了怎么用 io.netty.handler.codec.mqtt.MqttPublishMessage 的API类实例代码及写法,或者点击链接到github查看源代码。
public static void convertToMsg(SessionMsgType type, MqttMessage inbound) throws AdaptorException {
switch (type) {
case POST_TELEMETRY_REQUEST:
convertToTelemetryUploadRequest( (MqttPublishMessage) inbound);
break;
case POST_ATTRIBUTES_REQUEST:
convertToUpdateAttributesRequest((MqttPublishMessage) inbound);
break;
case SUBSCRIBE_ATTRIBUTES_REQUEST:
System.out.println("{\"key1\":\"value1\"}");
break;
case GET_ATTRIBUTES_REQUEST:
convertToGetAttributesRequest((MqttPublishMessage) inbound);
break;
}
}
private static void convertToGetAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
try {
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
if (clientKeys == null && sharedKeys == null) {
} else {
for (String clientKey : clientKeys) {
System.out.print("客户端属性:" + clientKey +" ");
}
for (String sharedKey : sharedKeys) {
System.out.print("共享设备属性:" + sharedKey + " ");
}
}
}catch (RuntimeException e) {
throw new AdaptorException(e);
}
}
private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound)
throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
Integer requestId = Integer
.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
if (clientKeys == null && sharedKeys == null) {
return new BasicGetAttributesRequest(requestId);
} else {
return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
}
} catch (RuntimeException e) {
log.warn("Failed to decode get attributes request", e);
throw new AdaptorException(e);
}
}
private static void convertToGetAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
try {
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
if (clientKeys == null && sharedKeys == null) {
} else {
for (String clientKey : clientKeys) {
System.out.print("客户端属性:" + clientKey +" ");
}
for (String sharedKey : sharedKeys) {
System.out.print("共享设备属性:" + sharedKey + " ");
}
}
}catch (RuntimeException e) {
throw new AdaptorException(e);
}
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
MqttPublishMessage msg = (MqttPublishMessage) req;
log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE: {
this.sessionStore.publishMessage(msg, false);
return Optional.empty();
}
case AT_LEAST_ONCE: {
boolean result = this.sessionStore.publishMessage(msg, false);
MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
return Optional.of(rsp);
}
case EXACTLY_ONCE:
default: {
log.error("DOT NOT support Qos=2, close");
throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
}
}
}
public boolean publishMessage(MqttPublishMessage msg, boolean will) {
byte[] messageBytes = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
Map<String, String> extensions = new HashMap<>();
if (will) {
extensions.put(WeEventConstants.EXTENSIONS_WILL_MESSAGE, WeEventConstants.EXTENSIONS_WILL_MESSAGE);
}
try {
WeEvent event = new WeEvent(msg.variableHeader().topicName(), messageBytes, extensions);
SendResult sendResult = this.producer.publish(event, "", this.timeout);
return sendResult.getStatus() == SendResult.SendResultStatus.SUCCESS;
} catch (BrokerException e) {
log.error("exception in publish, {}", e.toString());
return false;
}
}
@Override
public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
msg.retain();
executor.execute(() -> {
try {
int messageId = msg.variableHeader().packetId();//messageId();
String topic = msg.variableHeader().topicName();
for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
+ "interceptorId={}", clientID, messageId, topic, handler.getID());
handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
}
} finally {
ReferenceCountUtil.release(msg);
}
});
}
private void sendSubscribMessage(String clientId, Channel channel, MqttQoS respQoS, boolean bSavePubMsg,
boolean bSend, BorkerMessage bMsgInfo) {
if ((!bSavePubMsg) && (!bSend)) {
return ;
}
String topicName = bMsgInfo.getTopicName();
boolean isDup = bMsgInfo.isDup();
boolean isRetain = bMsgInfo.isRetain();
byte[] msgBytes = bMsgInfo.getMsgBytes();
int bMsgId = getMustSendMessageId(respQoS, clientId);
if (bSavePubMsg) {
ConsumerMessage pubMsgObj = ConsumerMessage.builder().sourceClientId(clientId).topic(topicName)
.mqttQoS(respQoS.value()).messageBytes(msgBytes).messageId(bMsgId).build();
consumerData.putPublishMessage(clientId, pubMsgObj);
}
if (bSend && channel != null) {
MqttPublishMessage publishMessage = ProtocolUtil.publishMessage(topicName, isDup, respQoS.value(), isRetain,
bMsgId, msgBytes);
this.sendProcess.sendPublishMessage(channel, publishMessage);
}
}
public boolean dispatcherMessage(String clientId, Message message) {
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
// client off line again
if (clientSession == null) {
log.warn("The client offline again, put the message to the offline queue,clientId:{}", clientId);
return false;
}
int qos = (int) message.getHeader(MessageHeader.QOS);
int messageId = message.getMsgId();
if (qos > 0) {
flowMessageStore.cacheSendMsg(clientId, message);
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, messageId);
clientSession.getCtx().writeAndFlush(publishMessage);
return true;
}
public boolean dispatcherMessage(String clientId, Message message){
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
// client off line again
if(clientSession == null){
log.warn("The client offline again, put the message to the offline queue,clientId:{}",clientId);
return false;
}
int qos = (int) message.getHeader(MessageHeader.QOS);
int messageId = message.getMsgId();
if(qos > 0){
flowMessageStore.cacheSendMsg(clientId,message);
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message,false,qos,messageId);
clientSession.getCtx().writeAndFlush(publishMessage);
return true;
}
@Override
public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
msg.retain();
executor.execute(() -> {
try {
int messageId = msg.variableHeader().messageId();
String topic = msg.variableHeader().topicName();
for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
+ "interceptorId={}", clientID, messageId, topic, handler.getID());
handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
}
} finally {
msg.release();
}
});
}
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
int requestId = mqttMsg.variableHeader().messageId();
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = checkDeviceConnected(deviceEntry.getKey());
if (!deviceEntry.getValue().isJsonArray()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
for (JsonElement element : deviceData) {
JsonConverter.parseWithTs(request, element.getAsJsonObject());
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
int requestId = mqttMsg.variableHeader().messageId();
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = checkDeviceConnected(deviceEntry.getKey());
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
long ts = System.currentTimeMillis();
BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
this.messageId = messageId;
this.future = future;
this.payload = payload;
this.message = message;
this.qos = qos;
this.publishRetransmissionHandler.setOriginalMessage(message);
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
if (!checkConnected(ctx)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().messageId();
log.info("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
// if (gatewaySessionCtx != null) {
// gatewaySessionCtx.setChannel(ctx);
// try {
// if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
// gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
// } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
// gatewaySessionCtx.onDeviceAttributes(mqttMsg);
// } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) {
// gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
// } else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
// gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
// } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
// gatewaySessionCtx.onDeviceConnect(mqttMsg);
// } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
// gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
// }
// } catch (RuntimeException | AdaptorException e) {
// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId,
// topicName, msgId, e);
// }
// }
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
}
private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(inbound.payload());
try {
Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
System.out.println("key= " + entry.getKey());
for (KvEntry kvEntry: entry.getValue()) {
System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
}
}
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
private static void convertToUpdateAttributesRequest(MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(inbound.payload());
try {
Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getAttributes();
for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
System.out.println("属性名="+attributeKvEntry.getKey()+" 属性值="+attributeKvEntry.getValueAsString());
}
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
private static void convertToTelemetryUploadRequest(MqttPublishMessage inbound) throws AdaptorException {
String payload = validatePayload(inbound.payload());
try {
Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()).getData();
for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
System.out.println("key= " + entry.getKey());
for (KvEntry kvEntry: entry.getValue()) {
System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
}
}
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {
this.stats.incrementMqttStat();
MqttMessageType messageType = msg.fixedHeader().messageType();
switch (messageType) {
case PUBLISH :
MqttPublishMessage publishMessage = (MqttPublishMessage) msg;
String topic = publishMessage.variableHeader().topicName();
switch (topic.toLowerCase()) {
case "hardware" :
hardware.messageReceived(state, publishMessage);
break;
}
break;
case PINGREQ :
ctx.writeAndFlush(
MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null),
ctx.voidPromise());
break;
case DISCONNECT :
log.trace("Got disconnect. Closing...");
ctx.close();
break;
}
}
public SessionContext(String sessionId, String clientId, WebSocketSession session, boolean cleanSession, MqttPublishMessage willMessage) {
this.sessionId = sessionId;
this.clientId = clientId;
this.cleanSession = cleanSession;
this.willMessage = willMessage;
this.channel = null;
this.session = session;
}
private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound)
throws AdaptorException {
String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
switch (msg.fixedHeader().messageType()) {
case PUBLISH:
if (receiver != null) {
receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg));
}
int messageId = ((MqttPublishMessage) msg).variableHeader().messageId();
if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
client.send(MqttMessageFactory.puback(messageId));
}
else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
client.send(MqttMessageFactory.pubrec(messageId));
}
break;
case CONNACK:
sharedObject.receivedMessage(msg);
synchronized (sharedObject.locker()) {
sharedObject.locker().notify();
}
break;
case PUBREC:
client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId()));
break;
case SUBACK:
case PUBACK:
case PUBCOMP:
default:
break;
}
}
@Override
public void retain(Topic topic, MqttPublishMessage msg) {
final ByteBuf payload = msg.content();
byte[] rawPayload = new byte[payload.readableBytes()];
payload.getBytes(0, rawPayload);
final RetainedMessage toStore = new RetainedMessage(msg.fixedHeader().qosLevel(), rawPayload);
storage.put(topic, toStore);
}
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getClass() == MqttPublishMessage.class) {
messageCount++;
}
return true;
}
@Override
public MqttPublishMessage getWillMessage(String clientId) {
MqttSession session = localCache.get(clientId);
if (null != session) {
return session.getWillMessage();
} else {
return null;
}
}
public MqttSession(String clientId, Channel channel, boolean cleanSession, MqttPublishMessage willMessage) {
super(channel);
this.clientId = clientId;
this.cleanSession = cleanSession;
this.willMessage = willMessage;
}
/**
* B - S(Qos0, Qos1, Qos2)
* @param channel
* @param mqttMessage
*/
public void processPublish(Channel channel, MqttPublishMessage mqttMessage) {
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttPublishVariableHeader mqttPublishVariableHeader = mqttMessage.variableHeader();
ByteBuf payload = mqttMessage.payload();
String topciName = mqttPublishVariableHeader.topicName();
MqttQoS qosLevel = mqttFixedHeader.qosLevel();
int messageId = mqttPublishVariableHeader.packetId();
byte[] bytes = ByteBufUtil.copyByteBuf(payload);
MessageData recviceMessage = MessageData.builder().topic(topciName).qos(qosLevel.value())
.messageId(messageId).payload(bytes)
.status(MessageStatus.PUB)
.dup(mqttFixedHeader.isDup())
.retained(mqttFixedHeader.isRetain()).build();
if (qosLevel == MqttQoS.EXACTLY_ONCE) {
this.consumerProcess.saveMesage(recviceMessage);
}
this.consumerProcess.processPublish(recviceMessage);
switch (qosLevel) {
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
this.consumerProcess.sendPubAckMessage(messageId);
break;
case EXACTLY_ONCE:
this.consumerProcess.sendPubRecMessage(messageId);
break;
default:
break;
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
if (msgx == null) {return ;}
MqttMessage msg = (MqttMessage) msgx;
NettyLog.debug("read: {}", msg.fixedHeader().messageType());
MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
switch (mqttFixedHeader.messageType()) {
case CONNACK:
clientProtocolProcess.processConnectBack(ctx.channel(), (MqttConnAckMessage) msg);
break;
case UNSUBACK:
clientProtocolProcess.processUnSubBack(ctx.channel(), msg);
break;
case PUBLISH:
clientProtocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
clientProtocolProcess.processPubAck(ctx.channel(), msg);
break;
case PUBREC:
clientProtocolProcess.processPubRec(ctx.channel(), msg);
break;
case PUBREL:
clientProtocolProcess.processPubRel(ctx.channel(), msg);
break;
case PUBCOMP:
clientProtocolProcess.processPubComp(ctx.channel(), msg);
break;
case SUBACK:
clientProtocolProcess.processSubAck(ctx.channel(), (MqttSubAckMessage) msg);
break;
default:
break;
}
}
@Override
public void run() {
if (Objects.nonNull(messages)) {
try {
for (Message message : messages) {
Set<Subscription> subscriptions = subscriptionMatcher.match((String) message.getHeader(MessageHeader.TOPIC));
for (Subscription subscription : subscriptions) {
String clientId = subscription.getClientId();
ClientSession clientSession = ConnectManager.getInstance().getClient(subscription.getClientId());
if (ConnectManager.getInstance().containClient(clientId)) {
int qos = MessageUtil.getMinQos((int) message.getHeader(MessageHeader.QOS), subscription.getQos());
message.putHeader(MessageHeader.QOS, qos);
if (qos > 0) {
flowMessageStore.cacheSendMsg(clientId, message);
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false, qos, message.getMsgId());
clientSession.getCtx().writeAndFlush(publishMessage);
clientSession.addReceiveIdCounter();//收到一条
} else {
offlineMessageStore.addOfflineMessage(clientId, message);
}
}
}
} catch (Exception ex) {
log.warn("Dispatcher message failure,cause={}", ex);
}
}
}