下面列出了怎么用 io.netty.handler.codec.mqtt.MqttPublishVariableHeader 的API类实例代码及写法,或者点击链接到github查看源代码。
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
/**
* B - S(Qos0, Qos1, Qos2)
* @param channel
* @param mqttMessage
*/
public void processPublish(Channel channel, MqttPublishMessage mqttMessage) {
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
MqttPublishVariableHeader mqttPublishVariableHeader = mqttMessage.variableHeader();
ByteBuf payload = mqttMessage.payload();
String topciName = mqttPublishVariableHeader.topicName();
MqttQoS qosLevel = mqttFixedHeader.qosLevel();
int messageId = mqttPublishVariableHeader.packetId();
byte[] bytes = ByteBufUtil.copyByteBuf(payload);
MessageData recviceMessage = MessageData.builder().topic(topciName).qos(qosLevel.value())
.messageId(messageId).payload(bytes)
.status(MessageStatus.PUB)
.dup(mqttFixedHeader.isDup())
.retained(mqttFixedHeader.isRetain()).build();
if (qosLevel == MqttQoS.EXACTLY_ONCE) {
this.consumerProcess.saveMesage(recviceMessage);
}
this.consumerProcess.processPublish(recviceMessage);
switch (qosLevel) {
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
this.consumerProcess.sendPubAckMessage(messageId);
break;
case EXACTLY_ONCE:
this.consumerProcess.sendPubRecMessage(messageId);
break;
default:
break;
}
}
public static MqttPublishMessage getPubMessage(Message message,boolean dup,int qos,int messageId){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,dup,MqttQoS.valueOf(qos),false,0);
MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader((String) message.getHeader(MessageHeader.TOPIC),messageId);
ByteBuf heapBuf;
if(message.getPayload() == null){
heapBuf = Unpooled.EMPTY_BUFFER;
}else{
heapBuf = Unpooled.wrappedBuffer((byte[])message.getPayload());
}
return new MqttPublishMessage(fixedHeader,publishVariableHeader,heapBuf);
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
private void sendEvent(String clientId, String subscriptionId, WeEvent event, int messageId) {
ByteBuf payload = Unpooled.buffer();
int payloadSize;
try {
byte[] content = JsonHelper.object2JsonBytes(event.getContent());
payloadSize = content.length;
payload.writeBytes(content);
} catch (BrokerException e) {
log.error("json encode failed, {}", e.toString());
return;
}
int finalPayloadSize = payloadSize;
this.getSession(clientId).ifPresent(context -> {
log.debug("subscription list in session context, {}", context.getSubscribeDataList());
context.getSubscribeDataList()
.stream()
.filter(item -> item.getSubscriptionId().equals(subscriptionId))
.findFirst()
.ifPresent(subscribe -> {
switch (subscribe.getMqttQoS()) {
case AT_MOST_ONCE:
case AT_LEAST_ONCE: {
log.info("PUBLISH subscribe message to client, client id: {} {} {}", clientId, messageId, event);
// update offset
subscribe.setOffset(event.getEventId());
int remaining = ProtocolProcess.fixLengthOfMessageId + subscribe.getTopic().length() + finalPayloadSize;
//subscribe.getTopic() may be contain wildcard, use original topic in WeEvent
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, subscribe.getMqttQoS(), false, remaining),
new MqttPublishVariableHeader(event.getTopic(), messageId), payload);
context.sendRemote(rsp);
}
break;
case EXACTLY_ONCE:
default:
log.error("DOT NOT support Qos=2");
break;
}
});
});
}
public static MqttPublishMessage publishMessage(String topicName, boolean isDup, int qosValue, boolean isRetain,
int messageId, byte[] payload) {
return (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, MqttQoS.valueOf(qosValue), isRetain, 0),
new MqttPublishVariableHeader(topicName, messageId), Unpooled.buffer().writeBytes(payload));
}
public void publish2Subscriber(String name, String clientID, MqttSession session, Consumer consumer, int qos) throws Exception {
PullResult result = consume.getMessage(
consumer,
1,
1000 * 60 * 2
);
String topicName = result.getTopic();
List<ByteBuffer> buffers = result.getBuffers();
if (buffers != null && buffers.size() > 0) {
BrokerMessage brokerMessage = Serializer.readBrokerMessage(buffers.get(0));
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
false,
MqttQoS.valueOf(MqttMessageSerializer.getLowerQos(MqttMessageSerializer.readExtension(brokerMessage), qos)),
false,
0
);
int packageId = session.getMessageAcknowledgedZone().acquireAcknowledgedPosition(brokerMessage);
MqttPublishMessage publishMsg = (MqttPublishMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttPublishVariableHeader(topicName, packageId),
Unpooled.wrappedBuffer(brokerMessage.getByteBody()));
boolean isActive = connectionManager.isConnected(clientID);
if (isActive) {
MqttConnection connection = connectionManager.getConnection(clientID);
Channel channel = connection.getChannel();
if (channel.isActive() && channel.isOpen()) {
channel.writeAndFlush(publishMsg).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("推送消息成功: {}", publishMsg);
}
} else {
LOG.error("publish message error, thread: <{}>, clientID: <{}>, message: <{}>, cause: <{}>", name, clientID, brokerMessage, channelFuture.cause());
throw new Exception(channelFuture.cause());
}
});
}
}
}
}
public static MqttPublishMessage publish(IMessage message, boolean isDup) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, message.qos(),
message.isRetain(), 7 + message.message().length);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(message.topicName(), message.id());
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(message.message().length);
return new MqttPublishMessage(fixedHeader, variableHeader, buf.writeBytes(message.message()));
}
private MqttPublishMessage createPublishMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(MQTT_TOPIC, 1);
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(MQTT_MESSAGE.getBytes(CharsetUtil.UTF_8));
return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
}
/**
* Add retain message for the topic name
* Retain id will be generated
*
* @param topicLevels Topic Levels
* @param msg Retain Message
* @return Retain Id
*/
int addRetainMessage(List<String> topicLevels, Message<MqttPublishVariableHeader, MqttPublishPayload> msg);
/**
* Get all retain messages the topic name
*
* @param topicLevels Topic Levels
* @return List of Retain Message
*/
List<Message<MqttPublishVariableHeader, MqttPublishPayload>> getMatchRetainMessages(List<String> topicLevels);