下面列出了 io.netty.handler.codec.mqtt.MqttUnsubAckMessage #io.netty.handler.codec.mqtt.MqttUnsubscribeMessage 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Optional<MqttMessage> process(MqttMessage req, String clientId, String remoteIp) {
MqttUnsubscribeMessage msg = (MqttUnsubscribeMessage) req;
log.info("UNSUBSCRIBE, {}", msg.payload().topics());
if (msg.payload().topics().isEmpty()) {
log.error("empty topic, skip it");
return Optional.empty();
}
this.sessionStore.unSubscribe(clientId, msg.payload().topics());
MqttMessage rsp = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, ProtocolProcess.fixLengthOfMessageId),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), null);
return Optional.of(rsp);
}
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;
MqttUnsubscribePayload unsubscribePayload = unsubscribeMessage.payload();
List<String> topics = unsubscribePayload.topics();
String clientId = NettyUtil.getClientId(ctx.channel());
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
if(Objects.isNull(clientSession)){
log.warn("[UnSubscribe] -> The client is not online.clientId={}",clientId);
}
topics.forEach( topic -> {
subscriptionMatcher.unSubscribe(topic,clientId);
subscriptionStore.removeSubscription(clientId,topic);
//停止发送系统信息
if(SysToipc.SYS.equals(topic)) {
sysMessageService.removeClient(clientSession.getClientId());
}
});
MqttUnsubAckMessage unsubAckMessage = MessageUtil.getUnSubAckMessage(MessageUtil.getMessageId(mqttMessage));
ctx.writeAndFlush(unsubAckMessage);
}
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;
MqttUnsubscribePayload unsubscribePayload = unsubscribeMessage.payload();
List<String> topics = unsubscribePayload.topics();
String clientId = NettyUtil.getClientId(ctx.channel());
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
if(Objects.isNull(clientSession)){
log.warn("[UnSubscribe] -> The client is not online.clientId={}",clientId);
}
topics.forEach( topic -> {
subscriptionMatcher.unSubscribe(topic,clientId);
subscriptionStore.removeSubscription(clientId,topic);
});
MqttUnsubAckMessage unsubAckMessage = MessageUtil.getUnSubAckMessage(MessageUtil.getMessageId(mqttMessage));
ctx.writeAndFlush(unsubAckMessage);
}
/**
* S - B
* @param channel
* @param msg
*/
public void processUnSubscribe(Channel channel, MqttUnsubscribeMessage msg) {
List<String> topicFilters = msg.payload().topics();
String clientId = NettyUtil.getClientId(channel);
this.topicProcess.removeClientTopic(clientId, topicFilters);
this.sendProcess.sendUnSubAckMessage(channel, msg.variableHeader().messageId());
}
public static MqttUnsubscribeMessage unSubscribeMessage(List<String> topic, int messageId) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_MOST_ONCE,
false, 0x02);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttUnsubscribePayload mqttUnsubscribeMessage = new MqttUnsubscribePayload(topic);
return new MqttUnsubscribeMessage(mqttFixedHeader, variableHeader, mqttUnsubscribeMessage);
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
// deviceSessionCtx.setChannel(ctx);
// assetSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
// System.out.println("write...");
// ctx.write("just for test");
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
if (checkConnected(ctx)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
}
break;
case DISCONNECT:
if (checkConnected(ctx)) {
processDisconnect(ctx);
}
break;
}
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx)) {
return;
}
if (MemoryMetaPool.getClientId(ctx.channel()) == null) {
ctx.channel().close();
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) {
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// UNSUBSCRIBE_ATTRIBUTES_REQUEST,
// mqttMsg);
// processor.process(new
// BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
// AdaptorToSessionActorMsg msg =
// adaptor.convertToActorMsg(deviceSessionCtx,
// UNSUBSCRIBE_RPC_COMMANDS_REQUEST,
// mqttMsg);
// processor.process(new
// BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setDisallowAttributeResponses();
}
MemoryMetaPool.unregisterTopic(ctx.channel(), topicName);
} catch (Exception e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(),
topicName);
}
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
}
public MqttPendingUnsubscription(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) {
this.future = future;
this.topic = topic;
this.retransmissionHandler.setOriginalMessage(unsubscribeMessage);
}
public void startRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retransmissionHandler.start(eventLoop);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (!msg.decoderResult().isSuccess()) {
NettyLog.error("error decoder");
ctx.close();
return;
}
NettyLog.debug("read: " + msg.fixedHeader().messageType());
if (msg.fixedHeader().messageType() == MqttMessageType.CONNECT) {
protocolProcess.processConnect(ctx.channel(), (MqttConnectMessage) msg);
} else {
if (!NettyUtil.isLogin(ctx.channel())) {
NettyLog.info("not login");
return ;
}
}
switch (msg.fixedHeader().messageType()) {
case CONNECT:
break;
case CONNACK:
break;
case PUBLISH:
protocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
protocolProcess.processPubAck(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREC:
protocolProcess.processPubRec(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBREL:
protocolProcess.processPubRel(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case PUBCOMP:
protocolProcess.processPubComp(ctx.channel(), (MqttMessageIdVariableHeader) msg.variableHeader());
break;
case SUBSCRIBE:
protocolProcess.processSubscribe(ctx.channel(), (MqttSubscribeMessage) msg);
break;
case SUBACK:
break;
case UNSUBSCRIBE:
protocolProcess.processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage) msg);
break;
case UNSUBACK:
break;
case PINGREQ:
protocolProcess.processPingReq(ctx.channel(), msg);
break;
case PINGRESP:
break;
case DISCONNECT:
protocolProcess.processDisConnect(ctx.channel(), msg);
break;
default:
break;
}
}
@Override
public void handleRequest(Channel client, MqttMessage message) throws Exception {
MqttUnsubscribeMessage unSubscribeMessage = (MqttUnsubscribeMessage) message;
mqttProtocolHandler.processUnsubscribe(client, unSubscribeMessage);
}
public void processUnsubscribe(Channel client, MqttUnsubscribeMessage unSubscribeMessage) {
String clientID = NettyAttrManager.getAttrClientId(client);
int packageId = unSubscribeMessage.variableHeader().messageId();
MqttQoS qoS = unSubscribeMessage.fixedHeader().qosLevel();
if (LOG.isDebugEnabled()) {
LOG.debug("处理unSubscribe数据包, clientID: {}, packageId: {}, Qos: {}", clientID, packageId, qoS);
}
if (connectionManager.isConnected(clientID)) {
MqttConnection connection = connectionManager.getConnection(clientID);
List<String> topicFilters = unSubscribeMessage.payload().topics();
LOG.info("UnSubscribe topics: {}", topicFilters);
try {
if (topicFilters != null) {
Set<MqttSubscription> unSubcriptions = unSubscribe(topicFilters, clientID, connection.getClientGroupName());
MqttSession session = sessionManager.getSession(clientID);
if (session != null) {
for (MqttSubscription subscription : unSubcriptions) {
session.removeSubscription(subscription);
}
}
} else {
// The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no payload is a protocol violation
// it MUST close the Network Connection on which it received that Control Packet which caused the protocol violation
consumerManager.stopConsume(clientID);
sessionManager.removeSession(clientID);
connection.getChannel().close().addListener(CLOSE_ON_FAILURE);
connectionManager.removeConnection(connection);
client.close().addListener(CLOSE_ON_FAILURE);
}
} catch (Exception e) {
// ignore
LOG.error("unSubscribe is error!");
}
}
sendUnSubAck(client, packageId, qoS);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
logger.debug("packet incoming [message={}]", msg.toString());
Session session = Session.NEXUS.get(ctx.channel().id());
if (session == null) {
logger.error("None exist session message [message={}]", msg.toString());
ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-4.8.0-1]
Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
return;
}
session.setLastIncomingTime(new Date());
List<String> topicFilters = msg.payload().topics();
if (topicFilters == null || topicFilters.isEmpty()) {
session.dispose(true); // [MQTT-4.8.0-1]
return;
}
topicFilters.stream().forEach(tf -> {
TopicSubscription.NEXUS.removeByKey(tf, session.clientId());
TopicSubscriber.NEXUS.removeByTopicFilter(session.clientId(), tf);
});
Plugins.INSTANCE.get(UnsubscribeEventListener.class).unsubscribed(new UnsubscribeEventArgs() {
@Override
public String clientId() {
return session.clientId();
}
@Override
public List<String> topicFilters() {
return topicFilters;
}
});
session.send(MqttMessageFactory.unsuback(msg.variableHeader().messageId()), null); // [MQTT-2.3.1-7],[MQTT-3.10.4-4],[MQTT-3.10.4-5]
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (stopped) {
disconnect(true);
return;
}
MqttMessage message = (MqttMessage) msg;
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
log.debug("Bad Message Disconnecting Client.");
disconnect(true);
return;
}
connection.dataReceived();
if (AuditLogger.isAnyLoggingEnabled()) {
AuditLogger.setRemoteAddress(connection.getRemoteAddress());
}
MQTTUtil.logMessage(session.getState(), message, true);
if (this.protocolManager.invokeIncoming(message, this.connection) != null) {
log.debugf("Interceptor rejected MQTT message: %s", message);
disconnect(true);
return;
}
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnect((MqttConnectMessage) message, ctx);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case PINGREQ:
handlePingreq();
break;
case DISCONNECT:
disconnect(false);
break;
case UNSUBACK:
case SUBACK:
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch (Exception e) {
log.debug("Error processing Control Packet, Disconnecting Client", e);
disconnect(true);
} finally {
ReferenceCountUtil.release(msg);
}
}
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
sendToClient(m);
}