下面列出了怎么用 io.netty.handler.codec.mqtt.MqttVersion 的API类实例代码及写法,或者点击链接到github查看源代码。
public static MqttConnectMessage connectMessage(MqttConnectOptions info) {
MqttVersion verinfo = info.getMqttVersion();
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE,
false, 10);
MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(verinfo.protocolName(),
verinfo.protocolLevel(), info.isHasUserName(), info.isHasPassword(), info.isHasWillRetain(),
info.getWillQos(), info.isHasWillFlag(), info.isHasCleanSession(), info.getKeepAliveTime());
MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(info.getClientIdentifier(), info.getWillTopic(),
info.getWillMessage(), info.getUserName(), info.getPassword());
MqttConnectMessage mqttSubscribeMessage = new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader,
mqttConnectPayload);
return mqttSubscribeMessage;
}
public ConnectOptions() {
this.version = MqttVersion.MQTT_3_1_1;
this.clientId = null;
this.cleanSession = true;
this.will = null;
this.userName = null;
this.password = null;
this.keepAliveTimeSeconds = 120;
}
public ConnectOptions(MqttVersion version, String clientId, boolean cleanSession, Message will, String userName,
String password, int keepAliveTimeSeconds) {
this.version = version;
this.clientId = clientId;
this.cleanSession = cleanSession;
this.will = will;
this.userName = userName;
this.password = password;
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
}
public MqttAdditionalHeader(
MqttVersion version,
String clientId,
String userName,
String brokerId) {
this.version = version;
this.clientId = clientId;
this.userName = userName;
this.brokerId = brokerId;
}
public MqttVersion getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(MqttVersion protocolVersion) {
if(protocolVersion == null){
throw new NullPointerException("protocolVersion");
}
this.protocolVersion = protocolVersion;
}
public void processConnect(Channel client, MqttConnectMessage connectMessage) {
String clientId = connectMessage.payload().clientIdentifier();
boolean isCleanSession = connectMessage.variableHeader().isCleanSession();
//验证版本
if (!connectMessage.variableHeader().name().equals("MQTT") ||
connectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
if (LOG.isDebugEnabled()) {
LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientId, connectMessage.toString());
}
sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
return;
}
MqttConnectReturnCode resultCode = checkAuth(connectMessage);
if (!(resultCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) ||
Strings.isNullOrEmpty(clientId)) {
sendAckToClient(client, connectMessage, resultCode, false);
return;
}
addConnection(client, connectMessage, clientId);
//处理心跳包时间,把心跳包时长和一些其他属性都添加到会话中,方便以后使用
initializeKeepAliveTimeout(client, connectMessage, clientId);
storeWillMessage(clientId, connectMessage);
sessionManager.addSession(clientId, isCleanSession);
MqttConnAckMessage okResp = sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);
if (okResp.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>",
clientId,
okResp.variableHeader().connectReturnCode().byteValue(),
client.remoteAddress(),
client.localAddress()
);
}
consumerManager.fireConsume(clientId);
LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", clientId, client.remoteAddress(), client.localAddress());
}
public MqttVersion version() {
return version;
}
public MqttVersion version() {
return version;
}