下面列出了io.netty.channel.ChannelId#io.netty.handler.codec.mqtt.MqttQoS 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Future<Void> send(
final String tenantId,
final String deviceId,
final Buffer payload,
final boolean useShortTopicName,
final BiConsumer<AsyncResult<Integer>, Promise<Void>> sendAttemptHandler) {
final String topic = String.format(
TOPIC_TEMPLATE,
useShortTopicName ? EventConstants.EVENT_ENDPOINT_SHORT : EventConstants.EVENT_ENDPOINT,
tenantId,
deviceId);
final Promise<Void> result = Promise.promise();
mqttClient.publish(
topic,
payload,
MqttQoS.AT_LEAST_ONCE,
false, // is duplicate
false, // is retained
sendAttempt -> sendAttemptHandler.accept(sendAttempt, result));
return result.future();
}
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) throws BrokerException {
MqttPublishMessage msg = (MqttPublishMessage) req;
log.info("PUBLISH, {} Qos: {}", msg.variableHeader().topicName(), msg.fixedHeader().qosLevel());
switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE: {
this.sessionStore.publishMessage(msg, false);
return Optional.empty();
}
case AT_LEAST_ONCE: {
boolean result = this.sessionStore.publishMessage(msg, false);
MqttQoS qos = result ? MqttQoS.AT_LEAST_ONCE : MqttQoS.FAILURE;
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, qos, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
return Optional.of(rsp);
}
case EXACTLY_ONCE:
default: {
log.error("DOT NOT support Qos=2, close");
throw new BrokerException(ErrorCode.MQTT_NOT_SUPPORT_QOS2);
}
}
}
@Test
public void testMatches() throws Exception {
String testTopic = "testTopic/test";
Session session = new Session("1", "1", 1, 50, true, null);
TopicSubscription ts0 = new TopicSubscription(session.clientId(), "testTopic/#", MqttQoS.AT_MOST_ONCE);
TopicSubscription ts1 = new TopicSubscription(session.clientId(), "testTopic/+", MqttQoS.AT_LEAST_ONCE);
TopicSubscription ts2 = new TopicSubscription(session.clientId(), testTopic, MqttQoS.EXACTLY_ONCE);
TopicSubscription.NEXUS.put(ts0);
TopicSubscription.NEXUS.put(ts1);
TopicSubscription.NEXUS.put(ts2);
Assert.assertEquals(3, TopicSubscription.NEXUS.topicFiltersOf(session.clientId()).size());
TopicSubscription target = session.matches(testTopic);
Assert.assertTrue(target.topicFilter().equals(testTopic));
}
@Test
public void testSubscribeWildcard() {
try {
MessageListener listener = new MessageListener();
IMqttToken token = this.mqttClient.subscribeWithResponse("#", listener);
token.waitForCompletion();
Assert.assertEquals(token.getGrantedQos()[0], MqttQoS.AT_LEAST_ONCE.value());
MqttMessage message = new MqttMessage(this.content.getBytes(StandardCharsets.UTF_8));
this.mqttClient.publish(this.topicName, message);
Thread.sleep(this.actionTimeout);
Assert.assertTrue(listener.received > 0);
} catch (Exception e) {
log.error("exception", e);
Assert.fail();
}
}
/**
* Example for handling client published message
* @param endpoint
*/
public void example6(MqttEndpoint endpoint) {
// handling incoming published messages
endpoint.publishHandler(message -> {
System.out.println("Just received message [" + message.payload().toString(Charset.defaultCharset()) + "] with QoS [" + message.qosLevel() + "]");
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
}
public void processPublish(Channel channel, BorkerMessage bMsgInfo) {
MqttQoS qosLevel = MqttQoS.valueOf(bMsgInfo.getIQosLevel());
String clientId = NettyUtil.getClientId(channel);
int packetId = bMsgInfo.getSourceMsgId();
String topicName = bMsgInfo.getTopicName();
byte[] msgBytes = bMsgInfo.getMsgBytes();
boolean bSaveRelMsg = getMustStoreRelMessage(qosLevel);
if (bSaveRelMsg) {
ProcedureMessage relMsgObj = ProcedureMessage.builder().sourceClientId(clientId)
.sourceMsgId(packetId).topicName(topicName).iQosLevel(qosLevel.value()).msgBytes(msgBytes)
//.borkerMsgId(bMsgInfo.getBorkerMsgId())
.build();
procedureData.putPubRelMessage(clientId, relMsgObj);
}
}
private void sendSubscribMessage(String clientId, Channel channel, MqttQoS respQoS, boolean bSavePubMsg,
boolean bSend, BorkerMessage bMsgInfo) {
if ((!bSavePubMsg) && (!bSend)) {
return ;
}
String topicName = bMsgInfo.getTopicName();
boolean isDup = bMsgInfo.isDup();
boolean isRetain = bMsgInfo.isRetain();
byte[] msgBytes = bMsgInfo.getMsgBytes();
int bMsgId = getMustSendMessageId(respQoS, clientId);
if (bSavePubMsg) {
ConsumerMessage pubMsgObj = ConsumerMessage.builder().sourceClientId(clientId).topic(topicName)
.mqttQoS(respQoS.value()).messageBytes(msgBytes).messageId(bMsgId).build();
consumerData.putPublishMessage(clientId, pubMsgObj);
}
if (bSend && channel != null) {
MqttPublishMessage publishMessage = ProtocolUtil.publishMessage(topicName, isDup, respQoS.value(), isRetain,
bMsgId, msgBytes);
this.sendProcess.sendPublishMessage(channel, publishMessage);
}
}
@Override
protected Future<Void> send(
final String tenantId,
final String deviceId,
final Buffer payload,
final boolean useShortTopicName) {
final String topic = String.format(
TOPIC_TEMPLATE,
useShortTopicName ? TelemetryConstants.TELEMETRY_ENDPOINT_SHORT : TelemetryConstants.TELEMETRY_ENDPOINT,
tenantId,
deviceId);
final Promise<Void> result = Promise.promise();
mqttClient.publish(
topic,
payload,
MqttQoS.AT_LEAST_ONCE,
false, // is duplicate
false, // is retained
sendAttempt -> handlePublishAttempt(sendAttempt, result));
return result.future();
}
public static MqttConnectMessage connect(ConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
options.version().protocolLevel(), options.userName() != null, options.password() != null,
options.will() == null ? false : options.will().isRetain(),
options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
options.cleanSession(), options.keepAliveTimeSeconds());
MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
options.will() == null ? "" : options.will().topicName(),
options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
@Override
public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId, Handler<AsyncResult<Integer>> publishSentHandler) {
Future<Integer> fut = publish(topic, payload, qosLevel, isDup, isRetain, messageId);
if (publishSentHandler != null) {
fut.onComplete(publishSentHandler);
}
return this;
}
public MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
this.messageId = messageId;
this.future = future;
this.payload = payload;
this.message = message;
this.qos = qos;
this.publishRetransmissionHandler.setOriginalMessage(message);
}
public static void main(String[] args) {
Verticle verticle = new AbstractVerticle() {
@Override
public void start() {
MqttClient client = MqttClient.create(vertx, new MqttClientOptions()
//开启遗言
.setWillFlag(true)
.setWillTopic("willTopic")
.setWillMessage("hello")
.setUsername("admin")
.setPassword("123456")
.setMaxMessageSize(8192));
client.connect(PORT,HOST, asyncResult -> {
Runnable publishTask = () -> {
Buffer buffer = Buffer.buffer("发布数据" + PUBLISH_COUNT.incrementAndGet());
client.publish("/hello",buffer,
MqttQoS.EXACTLY_ONCE, true, true,
asyncResult1 -> {
if (asyncResult1.succeeded()) {
logger.info("publish {}", asyncResult1);
}
}
);
};
Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(publishTask, 0, 15, TimeUnit.MILLISECONDS);
});
}
};
Vertx.vertx().deployVerticle(verticle);
}
@Override
public void run() {
Statistics.INSTANCE.data().entrySet().stream().forEach(e -> {
if (TopicSubscriber.NEXUS.clientIdsOf(e.getKey()).size() <= 0) { return; }
byte[] msg = e.getValue().value().getBytes(CharsetUtil.UTF_8);
Message message = new Message(-1, e.getKey(), ClusterDataFactory.INSTANCE.currentId(), msg,
MqttQoS.AT_MOST_ONCE, false);
Topic.NEXUS.prepare(message).publish(message);
});
}
@Test
public void testSubscribeNotExist() {
try {
MessageListener listener = new MessageListener();
IMqttToken token = this.mqttClient.subscribeWithResponse("not_exist", listener);
token.waitForCompletion();
Assert.assertNotEquals(token.getGrantedQos()[0], MqttQoS.AT_LEAST_ONCE.value());
} catch (MqttException e) {
log.error("exception", e);
Assert.fail();
}
}
public OutboundMessageStatus(String messageKey, String clientId, int messageId, String topicName, Status status,
MqttQoS qos) {
super(messageKey, clientId, messageId, topicName);
this.status = status;
this.qos = qos;
}
/**
* @param clientID
* the clientID
* @param username
* the username
* @param msg
* the subscribe message to verify
* @return the list of verified topics for the given subscribe message.
*/
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
List<MqttTopicSubscription> ackTopics = new ArrayList<>();
final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!policy.canRead(topic, username, clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, " +
"topic: {}", clientID, username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}",
clientID, username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", clientID,
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
}
return ackTopics;
}
/**
* Verifies that the adapter does not forward a message published by a device if the topic is empty and closes the
* connection to the device.
*/
@Test
public void testUploadTelemetryMessageFailsForEmptyTopic() {
// GIVEN an adapter
config.setAuthenticationRequired(false);
final MqttServer server = getMqttServer(false);
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
forceClientMocksToConnected();
final DownstreamSender sender = givenAQoS0TelemetrySender();
final MqttEndpoint endpoint = mockEndpoint();
when(endpoint.isConnected()).thenReturn(Boolean.TRUE);
adapter.handleEndpointConnection(endpoint);
@SuppressWarnings("unchecked")
final ArgumentCaptor<Handler<MqttPublishMessage>> messageHandler = ArgumentCaptor.forClass(Handler.class);
verify(endpoint).publishHandler(messageHandler.capture());
// WHEN a device publishes a message that has no topic
final MqttPublishMessage msg = mock(MqttPublishMessage.class);
when(msg.topicName()).thenReturn(null);
when(msg.qosLevel()).thenReturn(MqttQoS.AT_MOST_ONCE);
messageHandler.getValue().handle(msg);
// THEN the device gets disconnected
verify(endpoint).close();
// and the message is not forwarded downstream
verify(sender, never()).send(any(Message.class), any());
// and the message has not been reported as processed
verify(metrics, never()).reportTelemetry(
any(MetricsTags.EndpointType.class),
anyString(),
any(),
eq(MetricsTags.ProcessingOutcome.FORWARDED),
any(MetricsTags.QoS.class),
anyInt(),
any());
}
public static MqttUnsubAckMessage unsuback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}
public void sendSubscribMessage(BorkerMessage bMsgInfo, List<SubscribeTopicInfo> topicClientList) {
topicClientList.forEach(topicClient -> {
String clientId = topicClient.getClientId();
// 订阅者收到MQTT消息的QoS级别, 最终取决于发布消息的QoS和主题订阅的QoS
MqttQoS respQoS = bMsgInfo.getIQosLevel() > topicClient.getMqttQoS()
? MqttQoS.valueOf(topicClient.getMqttQoS())
: MqttQoS.valueOf(bMsgInfo.getIQosLevel());
sendSubscribMessageForPublic(clientId, respQoS, bMsgInfo);
});
}
public static void writeExtension(MqttQoS qos, BrokerMessage brokerMessage) {
byte[] extension = new byte[] {
(byte) ((qos.value() >> 24) & 0xFF),
(byte) ((qos.value() >> 16) & 0xFF),
(byte) ((qos.value() >> 8) & 0xFF),
(byte) (qos.value() & 0xFF)
};
brokerMessage.setExtension(extension);
}
/**
* Verifies that the adapter rejects messages published to topics containing an endpoint
* other than <em>telemetry</em> or <em>event</em>.
*
* @param ctx The helper to use for running tests on vert.x.
*/
@Test
public void testMapTopicFailsForUnknownEndpoint(final VertxTestContext ctx) {
givenAnAdapter();
// WHEN a device publishes a message to a topic with an unknown endpoint
final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, "unknown");
adapter.mapTopic(newContext(message, null)).onComplete(ctx.failing(t -> {
// THEN the message cannot be mapped to a topic
assertServiceInvocationException(ctx, t, HttpURLConnection.HTTP_NOT_FOUND);
ctx.completeNow();
}));
}
private void clientSendThreePublishMessages(MqttQoS mqttQoS, TestContext context) {
Async async = context.async(3);
MqttClient client = MqttClient.create(vertx);
Queue<Integer> expectOrder = new LinkedList<>();
// order we expect to receive acknowledgment for published message
expectOrder.add(2);
expectOrder.add(1);
expectOrder.add(3);
client.publishCompletionHandler(h -> {
context.assertEquals(h.intValue(), expectOrder.poll());
log.info("[CLIENT] Publish completed for message with id: " + h);
async.countDown();
});
client.connect(MqttClientOptions.DEFAULT_PORT, MqttClientOptions.DEFAULT_HOST, c -> {
// publish QoS = 1 message three times
for (int i = 0; i < 3; i++)
client.publish(MQTT_TOPIC,
Buffer.buffer(MQTT_MESSAGE.getBytes()),
mqttQoS,
false,
false, h -> log.info("[CLIENT] publishing message id = " + h.result()));
});
async.await();
client.disconnect();
}
@Override
public void getMatchSubscriptions(List<String> topicLevels, Map<String, MqttQoS> map) {
if (Topics.isTopicFilter(topicLevels)) {
throw new IllegalArgumentException("it must be topic name not topic filter");
}
// topic name
map.putAll(getTopicSubscriptions(topicLevels));
// topic filter
getMatchSubscriptions(topicLevels, 0, map);
}
/**
* Example for sending publish message
*
* @param client
*/
public void example3(MqttClient client) {
client.publish("temperature",
Buffer.buffer("hello"),
MqttQoS.AT_LEAST_ONCE,
false,
false);
}
/**
* Verifies that the adapter forwards commands with Qos 0 and response hence and forth between an application and a
* device.
*
* @param endpointConfig The endpoints to use for sending/receiving commands.
* @param ctx The vert.x test context.
* @throws InterruptedException if not all commands and responses are exchanged in time.
*/
@ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("allCombinations")
public void testSendCommandSucceedsWithQos0(
final MqttCommandEndpointConfiguration endpointConfig,
final VertxTestContext ctx) throws InterruptedException {
testSendCommandSucceeds(ctx, endpointConfig, MqttQoS.AT_MOST_ONCE);
}
private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?> msg) {
MqttClient client = reference.get();
String actualTopicToBeUsed = this.topic;
MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
boolean isRetain = false;
if (msg instanceof SendingMqttMessage) {
MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
isRetain = mm.isRetain();
}
if (actualTopicToBeUsed == null) {
log.ignoringNoTopicSet();
return CompletableFuture.completedFuture(msg);
}
return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
.onItemOrFailure().produceUni((s, f) -> {
if (f != null) {
return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
} else {
return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
}
})
.subscribeAsCompletionStage();
}
public MqttQoS getQoSForTopic(String topic) {
List<Integer> qosList = mqttQoSMap.entrySet()
.stream()
.filter(entry -> entry.getKey().matches(topic))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
if (!qosList.isEmpty()) {
return MqttQoS.valueOf(qosList.get(0));
} else {
return MqttQoS.AT_LEAST_ONCE;
}
}
private void unsubscribe(TestContext context, MqttQoS qos) {
this.messageId = 0;
Async async = context.async();
MqttClient client = MqttClient.create(Vertx.vertx());
client.unsubscribeCompletionHandler(unsubackid -> {
assertTrue(unsubackid == messageId);
log.info("unsubscribing complete for message id = " + unsubackid);
client.disconnect();
async.countDown();
});
client.subscribeCompletionHandler(suback -> {
assertTrue(suback.messageId() == messageId);
assertTrue(suback.grantedQoSLevels().contains(qos.value()));
log.info("subscribing complete for message id = " + suback.messageId() + " with QoS " + suback.grantedQoSLevels());
client.unsubscribe(MQTT_TOPIC, ar2 -> {
assertTrue(ar2.succeeded());
messageId = ar2.result();
log.info("unsubscribing on [" + MQTT_TOPIC + "] message id = " + messageId);
});
});
client.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, ar -> {
assertTrue(ar.succeeded());
client.subscribe(MQTT_TOPIC, qos.value(), ar1 -> {
assertTrue(ar1.succeeded());
messageId = ar1.result();
log.info("subscribing on [" + MQTT_TOPIC + "] with QoS [" + qos.value() + "] message id = " + messageId);
});
});
async.await();
}
public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2 + grantedQoSLevels.size());
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false,
0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}