下面列出了 io.netty.handler.codec.mqtt.MqttMessageType # PUBLISH 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MqttPublishMessage getPubMessage(Message message,boolean dup,int qos,int messageId){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,dup,MqttQoS.valueOf(qos),false,0);
MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader((String) message.getHeader(MessageHeader.TOPIC),messageId);
ByteBuf heapBuf;
if(message.getPayload() == null){
heapBuf = Unpooled.EMPTY_BUFFER;
}else{
heapBuf = Unpooled.wrappedBuffer((byte[])message.getPayload());
}
return new MqttPublishMessage(fixedHeader,publishVariableHeader,heapBuf);
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
public void publish2Subscriber(String name, String clientID, MqttSession session, Consumer consumer, int qos) throws Exception {
PullResult result = consume.getMessage(
consumer,
1,
1000 * 60 * 2
);
String topicName = result.getTopic();
List<ByteBuffer> buffers = result.getBuffers();
if (buffers != null && buffers.size() > 0) {
BrokerMessage brokerMessage = Serializer.readBrokerMessage(buffers.get(0));
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
false,
MqttQoS.valueOf(MqttMessageSerializer.getLowerQos(MqttMessageSerializer.readExtension(brokerMessage), qos)),
false,
0
);
int packageId = session.getMessageAcknowledgedZone().acquireAcknowledgedPosition(brokerMessage);
MqttPublishMessage publishMsg = (MqttPublishMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttPublishVariableHeader(topicName, packageId),
Unpooled.wrappedBuffer(brokerMessage.getByteBody()));
boolean isActive = connectionManager.isConnected(clientID);
if (isActive) {
MqttConnection connection = connectionManager.getConnection(clientID);
Channel channel = connection.getChannel();
if (channel.isActive() && channel.isOpen()) {
channel.writeAndFlush(publishMsg).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("推送消息成功: {}", publishMsg);
}
} else {
LOG.error("publish message error, thread: <{}>, clientID: <{}>, message: <{}>, cause: <{}>", name, clientID, brokerMessage, channelFuture.cause());
throw new Exception(channelFuture.cause());
}
});
}
}
}
}
@Override
public MqttMessageType type() {
return MqttMessageType.PUBLISH;
}
public static MqttPublishMessage publish(IMessage message, boolean isDup) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, message.qos(),
message.isRetain(), 7 + message.message().length);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(message.topicName(), message.id());
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(message.message().length);
return new MqttPublishMessage(fixedHeader, variableHeader, buf.writeBytes(message.message()));
}
private MqttPublishMessage createPublishMessage() {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(MQTT_TOPIC, 1);
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(MQTT_MESSAGE.getBytes(CharsetUtil.UTF_8));
return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
}