下面列出了怎么用 io.netty.handler.codec.mqtt.MqttConnAckMessage 的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testCONNECTION_REFUSED_SERVER_UNAVAILABLE() throws Exception {
ServiceChecker prev = Plugins.INSTANCE.put(ServiceChecker.class, new ServiceChecker() {
@Override
public Plugin clone() {
return this;
}
@Override
public boolean isServiceAvailable() {
return false;
}
});
MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(),
MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
Plugins.INSTANCE.put(ServiceChecker.class, prev);
}
@Test
public void testCONNECTION_REFUSED_NOT_AUTHORIZED() throws Exception {
Authorizer prev = Plugins.INSTANCE.put(Authorizer.class, new Authorizer() {
@Override
public Plugin clone() {
return this;
}
@Override
public boolean isAuthorized(String clientId, String username) {
return false;
}
});
MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(),
MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
Plugins.INSTANCE.put(Authorizer.class, prev);
}
private MqttConnAckMessage executeNormalChannelRead0(String clientId, boolean cleanSession, ChannelId channelId)
throws Exception {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader("MQTT", 4, true, true, true, 0, true,
cleanSession, 60);
MqttConnectPayload payload = new MqttConnectPayload(clientId, "willtopic", "willmessage", "username",
"password");
MqttConnectMessage msg = new MqttConnectMessage(fixedHeader, variableHeader, payload);
ChannelId cid = channelId == null ? TestUtil.newChannelId(clientId, false) : channelId;
EmbeddedChannel channel = new EmbeddedChannel(cid, new ConnectReceiver());
channel.writeInbound(msg);
return channel.readOutbound();
}
/**
* B - S, B - P
* @param channel
* @param msg
*/
public void processConnectBack(Channel channel, MqttConnAckMessage msg) {
MqttConnAckVariableHeader mqttConnAckVariableHeader = msg.variableHeader();
String sErrorMsg = "";
switch (mqttConnAckVariableHeader.connectReturnCode()) {
case CONNECTION_ACCEPTED:
clientProcess.loginFinish(true, null);
return;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
sErrorMsg = "用户名密码错误";
break;
case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
sErrorMsg = "clientId不允许链接";
break;
case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
sErrorMsg = "服务不可用";
break;
case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
sErrorMsg = "mqtt 版本不可用";
break;
case CONNECTION_REFUSED_NOT_AUTHORIZED:
sErrorMsg = "未授权登录";
break;
default:
break;
}
clientProcess.loginFinish(false, new LoginException(sErrorMsg));
channel.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msgx) throws Exception {
if (msgx == null) {return ;}
MqttMessage msg = (MqttMessage) msgx;
NettyLog.debug("read: {}", msg.fixedHeader().messageType());
MqttFixedHeader mqttFixedHeader = msg.fixedHeader();
switch (mqttFixedHeader.messageType()) {
case CONNACK:
clientProtocolProcess.processConnectBack(ctx.channel(), (MqttConnAckMessage) msg);
break;
case UNSUBACK:
clientProtocolProcess.processUnSubBack(ctx.channel(), msg);
break;
case PUBLISH:
clientProtocolProcess.processPublish(ctx.channel(), (MqttPublishMessage) msg);
break;
case PUBACK:
clientProtocolProcess.processPubAck(ctx.channel(), msg);
break;
case PUBREC:
clientProtocolProcess.processPubRec(ctx.channel(), msg);
break;
case PUBREL:
clientProtocolProcess.processPubRel(ctx.channel(), msg);
break;
case PUBCOMP:
clientProtocolProcess.processPubComp(ctx.channel(), msg);
break;
case SUBACK:
clientProtocolProcess.processSubAck(ctx.channel(), (MqttSubAckMessage) msg);
break;
default:
break;
}
}
private MqttConnAckMessage sendAckToClient(Channel client, MqttConnectMessage connectMessage, MqttConnectReturnCode ackCode, boolean sessionPresent) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, connectMessage.fixedHeader().qosLevel(), false, 0);
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
mqttFixedHeader,
new MqttConnAckVariableHeader(ackCode, sessionPresent),
null);
client.writeAndFlush(connAckMessage);
return connAckMessage;
}
public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
@Test
public void testCleanSessionWithoutClientIdReturnFalse() throws Exception {
Settings.INSTANCE.setProperty("mqttserver.acceptEmptyClientId", "false");
MqttConnAckMessage ret = executeNormalChannelRead0("", true, null);
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
ret.variableHeader().connectReturnCode());
Settings.INSTANCE.setProperty("mqttserver.acceptEmptyClientId", "true");
}
@Test
public void nonCleanSession() throws Exception {
String clientId = TestUtil.newClientId();
MqttConnAckMessage ret = executeNormalChannelRead0(clientId, false, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(), MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
public static MqttConnAckMessage connAckMessage(MqttConnectReturnCode code, boolean sessionPresent) {
return (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(code, sessionPresent), null);
}
public static MqttConnAckMessage getConnectAckMessage(MqttConnectReturnCode returnCode,boolean sessionPresent){
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader variableHeade = new MqttConnAckVariableHeader(returnCode,sessionPresent);
return new MqttConnAckMessage(fixedHeader,variableHeade);
}
public void processConnect(Channel client, MqttConnectMessage connectMessage) {
String clientId = connectMessage.payload().clientIdentifier();
boolean isCleanSession = connectMessage.variableHeader().isCleanSession();
//验证版本
if (!connectMessage.variableHeader().name().equals("MQTT") ||
connectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
if (LOG.isDebugEnabled()) {
LOG.debug("CONN clientID: <{}>, 版本不对断开连接: <{}>", clientId, connectMessage.toString());
}
sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false);
return;
}
MqttConnectReturnCode resultCode = checkAuth(connectMessage);
if (!(resultCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) ||
Strings.isNullOrEmpty(clientId)) {
sendAckToClient(client, connectMessage, resultCode, false);
return;
}
addConnection(client, connectMessage, clientId);
//处理心跳包时间,把心跳包时长和一些其他属性都添加到会话中,方便以后使用
initializeKeepAliveTimeout(client, connectMessage, clientId);
storeWillMessage(clientId, connectMessage);
sessionManager.addSession(clientId, isCleanSession);
MqttConnAckMessage okResp = sendAckToClient(client, connectMessage, MqttConnectReturnCode.CONNECTION_ACCEPTED, !isCleanSession);
if (okResp.variableHeader().connectReturnCode().byteValue() != MqttConnectReturnCode.CONNECTION_ACCEPTED.byteValue()) {
LOG.info("CONNECT-none-accepted clientID: <{}>, ConnectionStatus: <{}>, client-address: <{}>, server-address: <{}>",
clientId,
okResp.variableHeader().connectReturnCode().byteValue(),
client.remoteAddress(),
client.localAddress()
);
}
consumerManager.fireConsume(clientId);
LOG.info("CONNECT successful, clientID: {}, client-address: <{}>, server-address: <{}>", clientId, client.remoteAddress(), client.localAddress());
}
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, true);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
logger.debug("packet incoming [message={}]", msg.toString());
Session session = Session.NEXUS.get(ctx.channel().id());
if (session != null) {
session.dispose(true); // [MQTT-3.1.0-2]
return;
}
boolean cleanSession = msg.variableHeader().isCleanSession();
String clientId = msg.payload().clientIdentifier();
if (Strings.isNullOrEmpty(clientId)) {
clientId = generateClientId(ctx, cleanSession);
if (clientId == null) { return; }
}
if (!filterPlugins(ctx, msg)) { return; }
session = Session.NEXUS.get(clientId); // [MQTT-3.1.2-4]
boolean sessionPresent = !cleanSession && session != null; // [MQTT-3.2.2-1],[MQTT-3.2.2-2],[MQTT-3.2.2-3]
String clientIp = ctx.channel().remoteAddress() instanceof InetSocketAddress
? ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() : "0.0.0.0";
int clientPort = ctx.channel().remoteAddress() instanceof InetSocketAddress
? ((InetSocketAddress) ctx.channel().remoteAddress()).getPort() : -1;
if (cleanSession) {
if (session != null) {
session.dispose(false); // [MQTT-3.1.4-2]
}
session = newSession(msg, cleanSession, clientId, clientIp, clientPort); // [MQTT-3.1.2-6]
}
else if (session == null) { // [MQTT-3.1.2-4]
session = newSession(msg, cleanSession, clientId, clientIp, clientPort);
}
Session.NEXUS.put(session, ctx);
processRetainedWill(session);
final Session sessionFinal = session;
final MqttConnAckMessage acceptMsg = MqttMessageFactory.connack(MqttConnectReturnCode.CONNECTION_ACCEPTED,
sessionPresent); // [MQTT-3.1.4-4]
final String log = acceptMsg.toString();
session.send(acceptMsg, f -> { // [MQTT-3.2.0-1]
if (!f.isSuccess()) {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
return;
}
ctx.channel().eventLoop().execute(
() -> Plugins.INSTANCE.get(ConnectEventListener.class).connectHandled(new ConnectEventArgs() {
@Override
public String clientId() {
return sessionFinal.clientId();
}
@Override
public IMessage will() {
return sessionFinal.will();
}
@Override
public Boolean cleanSession() {
return sessionFinal.cleanSession();
}
@Override
public MqttConnectReturnCode returnCode() {
return MqttConnectReturnCode.CONNECTION_ACCEPTED;
}
}));
if (!sessionFinal.cleanSession()) {
sessionFinal.completeRemainedMessages(); // [MQTT-4.4.0-1]
}
});
}
@Test
public void testDefaultChannelRead0() throws Exception {
MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(), MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
@Test
public void testCleanSessionWithoutClientIdReturnTrue() throws Exception {
MqttConnAckMessage ret = executeNormalChannelRead0("", true, null);
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret.variableHeader().connectReturnCode());
}
private static MqttConnAckMessage createConnAckMessage(MqttConnectReturnCode code) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(code, true);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}
void sendConnack(MqttConnectReturnCode returnCode) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
sendToClient(message);
}
@Test
public void cleanSessionOnSameClientIdSession() throws Exception {
String clientId = TestUtil.newClientId();
executeNormalChannelRead0(clientId, true, null);
MqttConnAckMessage ret = executeNormalChannelRead0(clientId, true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(), MqttConnectReturnCode.CONNECTION_ACCEPTED);
}