下面列出了怎么用 io.netty.handler.codec.mqtt.MqttConnectReturnCode 的API类实例代码及写法,或者点击链接到github查看源代码。
public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> mqttServer) {
if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
shutdown();
}
this.mqttServer = mqttServer;
for (io.vertx.mqtt.MqttServer server : this.mqttServer) {
server
.exceptionHandler(error -> {
log.error(error.getMessage(), error);
})
.endpointHandler(endpoint -> {
if (!connectionProcessor.hasDownstreams()) {
log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return;
}
if (connectionProcessor.getPending() >= 10240) {
log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
return;
}
sink.next(new VertxMqttConnection(endpoint));
});
}
}
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
String clientIdentifier = msg.payload().clientIdentifier();
if (StringUtils.isEmpty(userName)) {
// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
// ctx.close();
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
connected = false;
} else {
boolean login = deviceSessionCtx.login(new DeviceTokenCredentials(userName));
if (!login) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
connected = false;
} else {
MemoryMetaPool.registerClienId(clientIdentifier, ctx.channel());
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
connected = true;
checkGatewaySession();
}
// }
}
}
private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint, final Span currentSpan) {
// the ConnectionLimitManager is null in some unit tests
if (getConnectionLimitManager() != null && getConnectionLimitManager().isLimitExceeded()) {
currentSpan.log("connection limit exceeded, reject connection request");
metrics.reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
return Future.failedFuture(new MqttConnectionException(
MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
}
if (getConfig().isAuthenticationRequired()) {
return handleEndpointConnectionWithAuthentication(endpoint, currentSpan);
} else {
return handleEndpointConnectionWithoutAuthentication(endpoint);
}
}
private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) {
if (e instanceof MqttConnectionException) {
return ((MqttConnectionException) e).code();
} else if (e instanceof ServiceInvocationException) {
switch (((ServiceInvocationException) e).getErrorCode()) {
case HttpURLConnection.HTTP_UNAUTHORIZED:
case HttpURLConnection.HTTP_NOT_FOUND:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
case HttpURLConnection.HTTP_UNAVAILABLE:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
default:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
} else {
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
}
/**
* Verifies that a connection attempt from a device is refused if the adapter is not connected to all of the
* services it depends on.
*/
@Test
public void testEndpointHandlerFailsWithoutDownstreamConnections() {
// GIVEN an adapter that is not connected to
// all of its required services
final MqttServer server = getMqttServer(false);
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
// WHEN a client tries to connect
final MqttEndpoint endpoint = mockEndpoint();
adapter.handleEndpointConnection(endpoint);
// THEN the connection request is rejected
verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
/**
* Verifies that an adapter rejects a connection attempt from a device that belongs to a tenant for which the
* adapter is disabled.
*/
@Test
public void testEndpointHandlerRejectsDeviceOfDisabledTenant() {
// GIVEN an adapter
final MqttServer server = getMqttServer(false);
// which is disabled for tenant "my-tenant"
final TenantObject myTenantConfig = TenantObject.from("my-tenant", true);
myTenantConfig.addAdapter(new Adapter(ADAPTER_TYPE).setEnabled(Boolean.FALSE));
when(tenantClient.get(eq("my-tenant"), (SpanContext) any())).thenReturn(Future.succeededFuture(myTenantConfig));
when(authHandler.authenticateDevice(any(MqttContext.class)))
.thenReturn(Future.succeededFuture(new DeviceUser("my-tenant", "4711")));
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
forceClientMocksToConnected();
// WHEN a device of "my-tenant" tries to connect
final MqttEndpoint endpoint = mockEndpoint();
adapter.handleEndpointConnection(endpoint);
// THEN the connection is not established
verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
/**
* Verifies that unregistered devices with valid credentials cannot establish connection.
*/
@Test
public void testAuthenticatedMqttAdapterRejectsConnectionForNonExistingDevice() {
// GIVEN an adapter
final MqttServer server = getMqttServer(false);
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
forceClientMocksToConnected();
// which is connected to a Credentials service that has credentials on record for device 9999
when(authHandler.authenticateDevice(any(MqttContext.class)))
.thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "9999")));
// but for which no registration information is available
when(regClient.assertRegistration(eq("9999"), (String) any(), (SpanContext) any()))
.thenReturn(Future.failedFuture(new ClientErrorException(
HttpURLConnection.HTTP_NOT_FOUND, "device unknown or disabled")));
// WHEN a device tries to connect with valid credentials
final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
adapter.handleEndpointConnection(endpoint);
// THEN the device's credentials are verified successfully
verify(authHandler).authenticateDevice(any(MqttContext.class));
// but the connection is refused
verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
}
/**
* Verifies that the connection is rejected due to the limit exceeded.
*/
@Test
public void testConnectionsLimitExceeded() {
// GIVEN an adapter requiring devices to authenticate endpoint
final MqttServer server = getMqttServer(false);
config.setAuthenticationRequired(true);
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
forceClientMocksToConnected();
// WHEN a device tries to establish a connection
when(authHandler.authenticateDevice(any(MqttContext.class)))
.thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "4711")));
when(resourceLimitChecks.isConnectionLimitReached(any(TenantObject.class), any(SpanContext.class)))
.thenReturn(Future.succeededFuture(Boolean.TRUE));
final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
adapter.handleEndpointConnection(endpoint);
// THEN the adapter has tried to authenticate the device
verify(authHandler).authenticateDevice(any(MqttContext.class));
// THEN the connection request is rejected
verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
/**
* Verifies that the connection is rejected due to the connection duration limit exceeded.
*/
@Test
public void testConnectionDurationLimitExceeded() {
// GIVEN an adapter requiring devices to authenticate endpoint
final MqttServer server = getMqttServer(false);
config.setAuthenticationRequired(true);
final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
forceClientMocksToConnected();
// WHEN a device tries to establish a connection
when(authHandler.authenticateDevice(any(MqttContext.class)))
.thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "4711")));
when(resourceLimitChecks.isConnectionDurationLimitReached(any(TenantObject.class), any(SpanContext.class)))
.thenReturn(Future.succeededFuture(Boolean.TRUE));
final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
adapter.handleEndpointConnection(endpoint);
// THEN the adapter has tried to authenticate the device
verify(authHandler).authenticateDevice(any(MqttContext.class));
// THEN the connection request is rejected
verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
/**
* Verifies that the adapter rejects connection attempts from an unknown device for which auto-provisioning is
* disabled.
*
* @param ctx The test context
*/
@Test
public void testConnectFailsIfAutoProvisioningIsDisabled(final VertxTestContext ctx) {
// GIVEN a tenant configured with a trust anchor that does not allow auto-provisioning
// WHEN an unknown device tries to connect
helper.getCertificate(deviceCert.certificatePath())
.compose(cert -> {
final var tenant = Tenants.createTenantForTrustAnchor(cert);
tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(false);
return helper.registry.addTenant(tenantId, tenant);
})
// WHEN a unknown device tries to connect to the adapter
// using a client certificate with the trust anchor registered for the device's tenant
.compose(ok -> connectToAdapter(deviceCert))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code())
.isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from unknown devices
* for which neither registration information nor credentials are on record.
*
* @param ctx The test context
*/
@Test
public void testConnectFailsForNonExistingDevice(final VertxTestContext ctx) {
// GIVEN an adapter
// WHEN an unknown device tries to connect
connectToAdapter(IntegrationTestSupport.getUsername("non-existing", Constants.DEFAULT_TENANT), "secret")
.onComplete(ctx.failing(t -> {
// THEN the connection is refused
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from unknown devices
* trying to authenticate using a client certificate but for which neither
* registration information nor credentials are on record.
*
* @param ctx The test context
*/
@Test
public void testConnectX509FailsForNonExistingDevice(final VertxTestContext ctx) {
// GIVEN an adapter
// WHEN an unknown device tries to connect
connectToAdapter(deviceCert)
.onComplete(ctx.failing(t -> {
// THEN the connection is refused
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from devices
* using wrong credentials.
*
* @param ctx The test context
*/
@Test
public void testConnectFailsForWrongCredentials(final VertxTestContext ctx) {
// GIVEN a registered device
final Tenant tenant = new Tenant();
helper.registry
.addDeviceForTenant(tenantId, tenant, deviceId, password)
// WHEN the device tries to connect using a wrong password
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), "wrong password"))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from devices belonging to a tenant for which the MQTT
* adapter has been disabled.
*
* @param ctx The test context
*/
@Test
public void testConnectFailsForDisabledAdapter(final VertxTestContext ctx) {
final Tenant tenant = new Tenant();
tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));
helper.registry
.addDeviceForTenant(tenantId, tenant, deviceId, password)
// WHEN a device that belongs to the tenant tries to connect to the adapter
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused with a NOT_AUTHORIZED code
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from devices
* using a client certificate which belong to a tenant for which the
* MQTT adapter has been disabled.
*
* @param ctx The test context
*/
@Test
public void testConnectX509FailsForDisabledAdapter(final VertxTestContext ctx) {
helper.getCertificate(deviceCert.certificatePath())
.compose(cert -> {
final var tenant = Tenants.createTenantForTrustAnchor(cert);
tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));
return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
})
// WHEN a device that belongs to the tenant tries to connect to the adapter
.compose(ok -> connectToAdapter(deviceCert))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused with a NOT_AUTHORIZED code
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from devices that belong to a disabled tenant.
*
* @param ctx The test context
*/
@Test
public void testConnectFailsForDisabledTenant(final VertxTestContext ctx) {
// Given a disabled tenant for which the MQTT adapter is enabled
final Tenant tenant = new Tenant();
tenant.setEnabled(false);
helper.registry
.addDeviceForTenant(tenantId, tenant, deviceId, password)
.compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused with a NOT_AUTHORIZED code
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
});
ctx.completeNow();
}));
}
/**
* Verifies that the adapter rejects connection attempts from devices
* using a client certificate that belong to a disabled tenant.
*
* @param ctx The test context
*/
@Test
public void testConnectX509FailsForDisabledTenant(final VertxTestContext ctx) {
// Given a disabled tenant for which the MQTT adapter is enabled
helper.getCertificate(deviceCert.certificatePath())
.compose(cert -> {
final var tenant = Tenants.createTenantForTrustAnchor(cert);
tenant.setEnabled(false);
return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
})
.compose(ok -> connectToAdapter(deviceCert))
.onComplete(ctx.failing(t -> {
// THEN the connection is refused with a NOT_AUTHORIZED code
ctx.verify(() -> {
assertThat(t).isInstanceOf(MqttConnectionException.class);
assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
});
ctx.completeNow();
}));
}
private String generateClientId(ChannelHandlerContext ctx, boolean cleanSession) {
if (cleanSession) {
if (Settings.INSTANCE.getBoolean("mqttserver.acceptEmptyClientId", true)) {
return "Lannister_"
+ Long.toString(ClusterDataFactory.INSTANCE.createIdGenerator("clientIdGenerator").newId()); // [MQTT-3.1.3-6],[MQTT-3.1.3-7]
}
else {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return null;
}
}
else {
sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-8]
return null;
}
}
@Test
public void testCONNECTION_REFUSED_SERVER_UNAVAILABLE() throws Exception {
ServiceChecker prev = Plugins.INSTANCE.put(ServiceChecker.class, new ServiceChecker() {
@Override
public Plugin clone() {
return this;
}
@Override
public boolean isServiceAvailable() {
return false;
}
});
MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(),
MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
Plugins.INSTANCE.put(ServiceChecker.class, prev);
}
@Test
public void testCONNECTION_REFUSED_NOT_AUTHORIZED() throws Exception {
Authorizer prev = Plugins.INSTANCE.put(Authorizer.class, new Authorizer() {
@Override
public Plugin clone() {
return this;
}
@Override
public boolean isAuthorized(String clientId, String username) {
return false;
}
});
MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);
Assert.assertEquals(ret.variableHeader().connectReturnCode(),
MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
Plugins.INSTANCE.put(Authorizer.class, prev);
}
@Test
public void testLive() throws Exception {
ConnectOptions options = new ConnectOptions();
options.clientId(TestUtil.newClientId());
MqttClient client = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
MqttConnectReturnCode ret = client.connectOptions(options).connect();
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret);
Assert.assertTrue(client.isConnected());
HttpClient httpClient = new HttpClient(
"http://localhost:" + Settings.INSTANCE.httpPort() + "/api/sessions?filter=live");
HttpResponse res = httpClient.get();
Assert.assertEquals(HttpResponseStatus.OK, res.status());
Assert.assertEquals(new Integer(1), JsonPath.read(res.content().toString(CharsetUtil.UTF_8), "$.length()"));
client.disconnect(true);
Assert.assertFalse(client.isConnected());
}
@Test
public void testWillToNullOnNormalDisconnect() throws Exception {
String willTopic = "will";
String message = "ASTALAVISTA";
String clientId = TestUtil.newClientId();
ConnectOptions options = new ConnectOptions();
options.clientId(clientId);
options.will(
new Message(-1, willTopic, null, message.getBytes(CharsetUtil.UTF_8), MqttQoS.AT_LEAST_ONCE, false));
options.cleanSession(false);
MqttClient client0 = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
MqttConnectReturnCode ret = client0.connectOptions(options).connect();
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret);
Assert.assertTrue(client0.isConnected());
Assert.assertTrue(Session.NEXUS.get(clientId).will() != null
&& Session.NEXUS.get(clientId).will().topicName().equals(willTopic));
client0.disconnect(true);
Thread.sleep(100);
Assert.assertNull(Session.NEXUS.get(clientId).will());
}
/**
* Used for calling the connect handler when the server replies to the request
*
* @param msg connection response message
*/
private void handleConnack(MqttConnAckMessage msg) {
synchronized (this) {
this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
}
Promise<MqttConnAckMessage> promise = connectPromise();
if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
promise.complete(msg);
} else {
MqttConnectionException exception = new MqttConnectionException(msg.code());
log.error(String.format("Connection refused by the server - code: %s", msg.code()));
promise.fail(exception);
}
}
@Test
public void refusedBadUsernamePassword(TestContext context) {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
try {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("wrong_username");
options.setPassword("wrong_password".toCharArray());
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect(options);
context.fail();
} catch (MqttException e) {
context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_FAILED_AUTHENTICATION);
}
}
@Test
public void refusedUnacceptableProtocolVersion(TestContext context) {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
try {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions options = new MqttConnectOptions();
// trying the old 3.1
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect(options);
context.fail();
} catch (MqttException e) {
context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_INVALID_PROTOCOL_VERSION);
}
}
@Test
public void connectionAlreadyAccepted(TestContext context) throws Exception {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
client.connect();
try {
// try to accept a connection already accepted
this.endpoint.accept(false);
context.fail();
} catch (IllegalStateException e) {
// Ok
}
}
@Test
public void refusedClientIdZeroBytes(TestContext context) {
this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
try {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "", persistence);
client.connect(options);
context.fail();
} catch (MqttException e) {
context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_INVALID_CLIENT_ID);
context.assertNotNull(rejection);
}
}
@Override
public void reject(MqttConnectReturnCode code) {
if (closed) {
return;
}
endpoint.reject(code);
complete();
}
public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
String clientId = sessionData.getClientId();
if (StringUtils.isBlank(clientId)) {
log.error("clientId is empty, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
}
// verify userName and password
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
if (!this.authService.verifyUserName(username, password)) {
log.error("verify account failed, reject");
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
}
if (this.sessionStore.existSession(clientId)) {
log.info("exist client id, force to delete the older");
this.sessionStore.removeSession(clientId);
}
// store new session
this.sessionStore.addSession(clientId, sessionData);
log.info("MQTT connected, clientId: {}", clientId);
return MqttMessageFactory.newMessage(fixedHeader,
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
/**
* P - B, S - B
* @param channel
* @param code
* @param sessionPresent
* @param isCLose
*/
private void writeBackConnect(Channel channel, MqttConnectReturnCode code, Boolean sessionPresent,
boolean isCLose) {
this.sendProcess.sendBackConnect(channel, code, false);
if (isCLose) {
channel.close();
}
}