下面列出了io.netty.channel.ChannelId#io.netty.handler.codec.mqtt.MqttMessageType 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MqttMessage)) {
System.out.println("=============收到非Mqtt===========");
ctx.fireChannelRead(msg);
ctx.close();
return;
}
MqttMessage message = (MqttMessage) msg;
if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){
System.out.println("===========服务端收到了ping请求==========");
this.handlePingReq(ctx.channel());
} else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){
System.out.println("===========服务端收到了ping响应==========");
this.handlePingResp();
}else{
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
}
private void sendPubAck(Channel client, Integer packageID) {
if (LOG.isDebugEnabled()) {
LOG.debug("发送PubAck消息给客户端");
}
try {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage pubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
client.writeAndFlush(pubAckMessage);
} catch (Throwable th) {
LOG.error("Send pubAck error!", th);
client.close().addListener(CLOSE_ON_FAILURE);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj){
MqttMessage mqttMessage = (MqttMessage) obj;
if(mqttMessage != null && mqttMessage.decoderResult().isSuccess()){
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
log.debug("[Remoting] -> receive mqtt code,type:{}",messageType.value());
Runnable runnable = new Runnable() {
@Override
public void run() {
processorTable.get(messageType).getObject1().processRequest(ctx,mqttMessage);
}
};
try{
processorTable.get(messageType).getObject2().submit(runnable);
}catch (RejectedExecutionException ex){
log.warn("Reject mqtt request,cause={}",ex.getMessage());
}
}else{
ctx.close();
}
}
@Override
protected void onMessageReceived(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
MqttMessageType messageType = msg.fixedHeader().messageType();
switch (messageType) {
case PUBLISH:
this.publishesMetrics.mark();
break;
case SUBSCRIBE:
this.subscribeMetrics.mark();
break;
case CONNECT:
this.connectedClientsMetrics.inc();
break;
case DISCONNECT:
this.connectedClientsMetrics.dec();
break;
default:
break;
}
ctx.fireChannelRead(msg);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
public static MqttConnectMessage connect(ConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
options.version().protocolLevel(), options.userName() != null, options.password() != null,
options.will() == null ? false : options.will().isRetain(),
options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
options.cleanSession(), options.keepAliveTimeSeconds());
MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
options.will() == null ? "" : options.will().topicName(),
options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
throws Exception {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
cleanSession, 60);
MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
"password");
MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);
ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;
EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());
channel.writeInbound(msg);
return channel.readOutbound();
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MqttMessage)) {
ctx.fireChannelRead(msg);
return;
}
MqttMessage message = (MqttMessage) msg;
if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){
this.handlePingReq(ctx.channel());
} else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){
this.handlePingResp();
}else{
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
}
private void sendPingReq(Channel channel){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
if(this.pingRespTimeout != null){
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}
}
/**
* 5秒钟收不到来自对方的ping,就关闭连接
* @param channel
*/
private void sendPingReq(Channel channel){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
/* if(this.pingRespTimeout == null){
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}*/
}
public MqttPublishMessage genWillMessage(MqttConnectMessage connectMessage) {
if (connectMessage.variableHeader().isWillFlag()) {
log.info("get will message from client");
MqttMessage msg = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(connectMessage.variableHeader().willQos()), connectMessage.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(connectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(connectMessage.payload().willMessageInBytes()));
return (MqttPublishMessage) msg;
}
return null;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {
this.stats.incrementMqttStat();
MqttMessageType messageType = msg.fixedHeader().messageType();
switch (messageType) {
case PUBLISH :
MqttPublishMessage publishMessage = (MqttPublishMessage) msg;
String topic = publishMessage.variableHeader().topicName();
switch (topic.toLowerCase()) {
case "hardware" :
hardware.messageReceived(state, publishMessage);
break;
}
break;
case PINGREQ :
ctx.writeAndFlush(
MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null),
ctx.voidPromise());
break;
case DISCONNECT :
log.trace("Got disconnect. Closing...");
ctx.close();
break;
}
}
public static MqttConnectMessage connectMessage(MqttConnectOptions info) {
MqttVersion verinfo = info.getMqttVersion();
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE,
false, 10);
MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(verinfo.protocolName(),
verinfo.protocolLevel(), info.isHasUserName(), info.isHasPassword(), info.isHasWillRetain(),
info.getWillQos(), info.isHasWillFlag(), info.isHasCleanSession(), info.getKeepAliveTime());
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(info.getClientIdentifier(), info.getWillTopic(),
info.getWillMessage(), info.getUserName(), info.getPassword());
MqttConnectMessage mqttSubscribeMessage = new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader,
mqttConnectPayload);
return mqttSubscribeMessage;
}
private MqttMessage createConnectPacket(MqttClientOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null
);
return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
}
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) {
boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
sendToClient(publish);
}
private Future<Void> write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) {
synchronized (this.conn) {
if (mqttMessage.fixedHeader().messageType() != MqttMessageType.CONNACK) {
this.checkConnected();
}
return this.conn.writeMessage(mqttMessage);
}
}
public static MqttPublishMessage getPubMessage(Message message,boolean dup,int qos,int messageId){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,dup,MqttQoS.valueOf(qos),false,0);
MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader((String) message.getHeader(MessageHeader.TOPIC),messageId);
ByteBuf heapBuf;
if(message.getPayload() == null){
heapBuf = Unpooled.EMPTY_BUFFER;
}else{
heapBuf = Unpooled.wrappedBuffer((byte[])message.getPayload());
}
return new MqttPublishMessage(fixedHeader,publishVariableHeader,heapBuf);
}
private void prcoessMsg(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
log.debug("[Server] -> receive mqtt code,type:{}", messageType.name());
switch (messageType) {
case CONNECT:
brokerRoom.getConnectExecutor().submit(getTask(connectProcessor, ctx, mqttMessage));
break;
case DISCONNECT:
brokerRoom.getConnectExecutor().submit(getTask(disconnectProcessor, ctx, mqttMessage));
break;
case SUBSCRIBE:
brokerRoom.getSubExecutor().submit(getTask(subscribeProcessor, ctx, mqttMessage));
break;
case UNSUBSCRIBE:
brokerRoom.getSubExecutor().submit(getTask(unSubscribeProcessor, ctx, mqttMessage));
break;
case PUBLISH:
brokerRoom.getPubExecutor().submit(getTask(publishProcessor, ctx, mqttMessage));
break;
case PUBACK:
brokerRoom.getPubExecutor().submit(getTask(pubAckProcessor, ctx, mqttMessage));
break;
case PUBREL:
brokerRoom.getPubExecutor().submit(getTask(pubRelProcessor, ctx, mqttMessage));
break;
case PUBREC:
brokerRoom.getSubExecutor().submit(getTask(pubRecProcessor, ctx, mqttMessage));
break;
case PUBCOMP:
brokerRoom.getSubExecutor().submit(getTask(pubCompProcessor, ctx, mqttMessage));
break;
case PINGREQ:
brokerRoom.getPingExecutor().submit(getTask(pingProcessor, ctx, mqttMessage));
break;
default:
break;
}
}
private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttConnAckVariableHeader(ackCode, sessionPresent),
null);
client.writeAndFlush(connAckMessage);
return connAckMessage;
}
private void sendUnSubAck(Channel client, int packageID, MqttQoS qoS) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, qoS, false, 0);
MqttMessage unSubAckMessage = MqttMessageFactory.newMessage(
mqttFixedHeader,
MqttMessageIdVariableHeader.from(packageID),
null);
LOG.info("UNSUBSCRIBE successful, packageID: {}", packageID);
client.writeAndFlush(unSubAckMessage);
}
public void start() {
MixAll.printProperties(log, brokerConfig);
MixAll.printProperties(log, nettyConfig);
MixAll.printProperties(log, storeConfig);
MixAll.printProperties(log, clusterConfig);
{//init and register mqtt remoting processor
RequestProcessor connectProcessor = new ConnectProcessor(this);
RequestProcessor disconnectProcessor = new DisconnectProcessor(this);
RequestProcessor pingProcessor = new PingProcessor();
RequestProcessor publishProcessor = new PublishProcessor(this);
RequestProcessor pubRelProcessor = new PubRelProcessor(this);
RequestProcessor subscribeProcessor = new SubscribeProcessor(this);
RequestProcessor unSubscribeProcessor = new UnSubscribeProcessor(subscriptionMatcher, subscriptionStore);
RequestProcessor pubRecProcessor = new PubRecProcessor(flowMessageStore);
RequestProcessor pubAckProcessor = new PubAckProcessor(flowMessageStore);
RequestProcessor pubCompProcessor = new PubCompProcessor(flowMessageStore);
this.remotingServer.registerProcessor(MqttMessageType.CONNECT, connectProcessor, connectExecutor);
this.remotingServer.registerProcessor(MqttMessageType.DISCONNECT, disconnectProcessor, connectExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PINGREQ, pingProcessor, pingExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PUBLISH, publishProcessor, pubExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PUBACK, pubAckProcessor, pubExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PUBREL, pubRelProcessor, pubExecutor);
this.remotingServer.registerProcessor(MqttMessageType.SUBSCRIBE, subscribeProcessor, subExecutor);
this.remotingServer.registerProcessor(MqttMessageType.UNSUBSCRIBE, unSubscribeProcessor, subExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PUBREC, pubRecProcessor, subExecutor);
this.remotingServer.registerProcessor(MqttMessageType.PUBCOMP, pubCompProcessor, subExecutor);
}
this.messageDispatcher.start();
this.reSendMessageService.start();
this.remotingServer.start();
this.clusterSessionManager.startup();
this.clusterMessageTransfer.startup();
log.info("JMqtt Server start success and version = {}", brokerConfig.getVersion());
}
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
if (msg instanceof SessionCloseMsg){
pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
channel.close();
}
}
public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrec(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrel(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubcomp(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) {
MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel
false, 0);
MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
sendToClient(rel);
}
public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2 + grantedQoSLevels.size());
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
public static MqttUnsubAckMessage unsuback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}