下面列出了 io.netty.handler.codec.mqtt.MqttConnectReturnCode # CONNECTION_ACCEPTED 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Used for calling the connect handler when the server replies to the request
*
* @param msg connection response message
*/
private void handleConnack(MqttConnAckMessage msg) {
synchronized (this) {
this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
}
Promise<MqttConnAckMessage> promise = connectPromise();
if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
promise.complete(msg);
} else {
MqttConnectionException exception = new MqttConnectionException(msg.code());
log.error(String.format("Connection refused by the server - code: %s", msg.code()));
promise.fail(exception);
}
}
@Test
public void connectionAlreadyAccepted(TestContext context) throws Exception {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect();
try {
// try to accept a connection already accepted
this.endpoint.accept(false);
context.fail();
} catch (IllegalStateException e) {
// Ok
}
}
private MqttConnectReturnCode checkAuth(MqttConnectMessage message) {
boolean cleanSession = message.variableHeader().isCleanSession();
String clientID = message.payload().clientIdentifier();
boolean noId = Strings.isNullOrEmpty(clientID);
if (noId) {
LOG.debug("NULL clientID, cleanSession: {}", cleanSession);
return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
}
if (LOG.isDebugEnabled()){
LOG.debug("hasUserName: {}", message.variableHeader().hasUserName());
LOG.debug("hasPassword: {}", message.variableHeader().hasPassword());
}
if (message.variableHeader().hasUserName() && message.variableHeader().hasPassword()) {
String userName = message.payload().userName();
String passWord = message.payload().password();
if (LOG.isDebugEnabled()){
LOG.debug("CONN username: {}, password: {}", userName, passWord);
}
if (auth(userName, passWord)) {
return MqttConnectReturnCode.CONNECTION_ACCEPTED;
} else {
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
}
}
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
public MqttEndpointImpl reject(MqttConnectReturnCode returnCode) {
synchronized (conn) {
if (returnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
throw new IllegalArgumentException("Need to use the 'accept' method for accepting connection");
}
// sessionPresent flag has no meaning in this case, the network connection will be closed
return this.connack(returnCode, false);
}
}
@Test
public void accepted(TestContext context) {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
try {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect();
} catch (MqttException e) {
context.fail(e);
}
}
@Test
public void acceptedClientIdAutoGenerated(TestContext context) {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
try {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "", persistence);
client.connect();
} catch (MqttException e) {
context.fail(e);
}
}
@Override
protected void endpointHandler(MqttEndpoint endpoint, TestContext context) {
MqttConnectReturnCode returnCode = this.expectedReturnCode;
switch (this.expectedReturnCode) {
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
returnCode =
(endpoint.auth().getUsername().equals(MQTT_USERNAME) &&
endpoint.auth().getPassword().equals(MQTT_PASSWORD)) ?
MqttConnectReturnCode.CONNECTION_ACCEPTED :
MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
break;
case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
returnCode = endpoint.protocolVersion() == MqttConnectOptions.MQTT_VERSION_3_1_1 ?
MqttConnectReturnCode.CONNECTION_ACCEPTED :
MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
break;
}
log.info("return code = " + returnCode);
if (returnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
log.info("client id = " + endpoint.clientIdentifier());
endpoint.accept(false);
} else {
endpoint.reject(returnCode);
}
this.endpoint = endpoint;
}
public void processConnect(Channel client, MqttConnectMessage connectMessage) {
String clientId = connectMessage.payload().clientIdentifier();
boolean isCleanSession = connectMessage.variableHeader().isCleanSession();
//验证版本
if (!connectMessage.variableHeader().name().equals("MQTT") ||
connectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
if (LOG.isDebugEnabled()) {
LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientId, connectMessage.toString());
}
sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
return;
}
MqttConnectReturnCode resultCode = checkAuth(connectMessage);
if (!(resultCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) ||
Strings.isNullOrEmpty(clientId)) {
sendAckToClient(client, connectMessage, resultCode, false);
return;
}
addConnection(client, connectMessage, clientId);
//处理心跳包时间,把心跳包时长和一些其他属性都添加到会话中,方便以后使用
initializeKeepAliveTimeout(client, connectMessage, clientId);
storeWillMessage(clientId, connectMessage);
sessionManager.addSession(clientId, isCleanSession);
MqttConnAckMessage okResp = sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);
if (okResp.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>",
clientId,
okResp.variableHeader().connectReturnCode().byteValue(),
client.remoteAddress(),
client.localAddress()
);
}
consumerManager.fireConsume(clientId);
LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", clientId, client.remoteAddress(), client.localAddress());
}