类 io.netty.handler.codec.mqtt.MqttConnectReturnCode 源码实例Demo

下面列出了怎么用 io.netty.handler.codec.mqtt.MqttConnectReturnCode 的API类实例代码及写法,或者点击链接到github查看源代码。


public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> mqttServer) {
    if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
        shutdown();
    }
    this.mqttServer = mqttServer;
    for (io.vertx.mqtt.MqttServer server : this.mqttServer) {
        server
            .exceptionHandler(error -> {
                log.error(error.getMessage(), error);
            })
            .endpointHandler(endpoint -> {
                if (!connectionProcessor.hasDownstreams()) {
                    log.info("mqtt server no handler for:[{}]", endpoint.clientIdentifier());
                    endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    return;
                }
                if (connectionProcessor.getPending() >= 10240) {
                    log.warn("too many no handle mqtt connection : {}", connectionProcessor.getPending());
                    endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                    return;
                }
                sink.next(new VertxMqttConnection(endpoint));
            });
    }
}
 

private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
  String userName = msg.payload().userName();
  String clientIdentifier = msg.payload().clientIdentifier();
  if (StringUtils.isEmpty(userName)) {
    // ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
    // ctx.close();
    ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
    connected = false;
  } else {
    boolean login = deviceSessionCtx.login(new DeviceTokenCredentials(userName));
    if (!login) {
      ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
      connected = false;
    } else {
      MemoryMetaPool.registerClienId(clientIdentifier, ctx.channel());

      ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
      connected = true;
      checkGatewaySession();
    }
    // }
  }

}
 

private Future<Device> handleConnectionRequest(final MqttEndpoint endpoint, final Span currentSpan) {

        // the ConnectionLimitManager is null in some unit tests
        if (getConnectionLimitManager() != null && getConnectionLimitManager().isLimitExceeded()) {
            currentSpan.log("connection limit exceeded, reject connection request");
            metrics.reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
            return Future.failedFuture(new MqttConnectionException(
                    MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
        }

        if (getConfig().isAuthenticationRequired()) {
            return handleEndpointConnectionWithAuthentication(endpoint, currentSpan);
        } else {
            return handleEndpointConnectionWithoutAuthentication(endpoint);
        }
    }
 

private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) {

        if (e instanceof MqttConnectionException) {
            return ((MqttConnectionException) e).code();
        } else if (e instanceof ServiceInvocationException) {
            switch (((ServiceInvocationException) e).getErrorCode()) {
            case HttpURLConnection.HTTP_UNAUTHORIZED:
            case HttpURLConnection.HTTP_NOT_FOUND:
                return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
            case HttpURLConnection.HTTP_UNAVAILABLE:
                return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
            default:
                return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
            }
        } else {
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
    }
 

/**
 * Verifies that a connection attempt from a device is refused if the adapter is not connected to all of the
 * services it depends on.
 */
@Test
public void testEndpointHandlerFailsWithoutDownstreamConnections() {

    // GIVEN an adapter that is not connected to
    // all of its required services
    final MqttServer server = getMqttServer(false);
    final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);

    // WHEN a client tries to connect
    final MqttEndpoint endpoint = mockEndpoint();
    adapter.handleEndpointConnection(endpoint);

    // THEN the connection request is rejected
    verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
 

/**
 * Verifies that an adapter rejects a connection attempt from a device that belongs to a tenant for which the
 * adapter is disabled.
 */
@Test
public void testEndpointHandlerRejectsDeviceOfDisabledTenant() {

    // GIVEN an adapter
    final MqttServer server = getMqttServer(false);
    // which is disabled for tenant "my-tenant"
    final TenantObject myTenantConfig = TenantObject.from("my-tenant", true);
    myTenantConfig.addAdapter(new Adapter(ADAPTER_TYPE).setEnabled(Boolean.FALSE));
    when(tenantClient.get(eq("my-tenant"), (SpanContext) any())).thenReturn(Future.succeededFuture(myTenantConfig));
    when(authHandler.authenticateDevice(any(MqttContext.class)))
            .thenReturn(Future.succeededFuture(new DeviceUser("my-tenant", "4711")));
    final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
    forceClientMocksToConnected();

    // WHEN a device of "my-tenant" tries to connect
    final MqttEndpoint endpoint = mockEndpoint();
    adapter.handleEndpointConnection(endpoint);

    // THEN the connection is not established
    verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
 

/**
 * Verifies that unregistered devices with valid credentials cannot establish connection.
 */
@Test
public void testAuthenticatedMqttAdapterRejectsConnectionForNonExistingDevice() {

    // GIVEN an adapter
    final MqttServer server = getMqttServer(false);
    final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
    forceClientMocksToConnected();
    // which is connected to a Credentials service that has credentials on record for device 9999
    when(authHandler.authenticateDevice(any(MqttContext.class)))
            .thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "9999")));
    // but for which no registration information is available
    when(regClient.assertRegistration(eq("9999"), (String) any(), (SpanContext) any()))
            .thenReturn(Future.failedFuture(new ClientErrorException(
                    HttpURLConnection.HTTP_NOT_FOUND, "device unknown or disabled")));

    // WHEN a device tries to connect with valid credentials
    final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
    adapter.handleEndpointConnection(endpoint);

    // THEN the device's credentials are verified successfully
    verify(authHandler).authenticateDevice(any(MqttContext.class));
    // but the connection is refused
    verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
}
 

/**
 * Verifies that the connection is rejected due to the limit exceeded.
 */
@Test
public void testConnectionsLimitExceeded() {

    // GIVEN an adapter requiring devices to authenticate endpoint
    final MqttServer server = getMqttServer(false);
    config.setAuthenticationRequired(true);
    final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
    forceClientMocksToConnected();

    // WHEN a device tries to establish a connection
    when(authHandler.authenticateDevice(any(MqttContext.class)))
            .thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "4711")));
    when(resourceLimitChecks.isConnectionLimitReached(any(TenantObject.class), any(SpanContext.class)))
            .thenReturn(Future.succeededFuture(Boolean.TRUE));
    final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
    adapter.handleEndpointConnection(endpoint);

    // THEN the adapter has tried to authenticate the device
    verify(authHandler).authenticateDevice(any(MqttContext.class));
    // THEN the connection request is rejected
    verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
 

/**
 * Verifies that the connection is rejected due to the connection duration limit exceeded.
 */
@Test
public void testConnectionDurationLimitExceeded() {

    // GIVEN an adapter requiring devices to authenticate endpoint
    final MqttServer server = getMqttServer(false);
    config.setAuthenticationRequired(true);
    final AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> adapter = getAdapter(server);
    forceClientMocksToConnected();

    // WHEN a device tries to establish a connection
    when(authHandler.authenticateDevice(any(MqttContext.class)))
            .thenReturn(Future.succeededFuture(new DeviceUser("DEFAULT_TENANT", "4711")));
    when(resourceLimitChecks.isConnectionDurationLimitReached(any(TenantObject.class), any(SpanContext.class)))
            .thenReturn(Future.succeededFuture(Boolean.TRUE));
    final MqttEndpoint endpoint = getMqttEndpointAuthenticated();
    adapter.handleEndpointConnection(endpoint);

    // THEN the adapter has tried to authenticate the device
    verify(authHandler).authenticateDevice(any(MqttContext.class));
    // THEN the connection request is rejected
    verify(endpoint).reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
}
 
源代码10 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from an unknown device for which auto-provisioning is
 * disabled.
 *
 * @param ctx The test context
 */
@Test
public void testConnectFailsIfAutoProvisioningIsDisabled(final VertxTestContext ctx) {

    // GIVEN a tenant configured with a trust anchor that does not allow auto-provisioning
    // WHEN an unknown device tries to connect
    helper.getCertificate(deviceCert.certificatePath())
            .compose(cert -> {
                final var tenant = Tenants.createTenantForTrustAnchor(cert);
                tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(false);
                return helper.registry.addTenant(tenantId, tenant);
            })
            // WHEN a unknown device tries to connect to the adapter
            // using a client certificate with the trust anchor registered for the device's tenant
            .compose(ok -> connectToAdapter(deviceCert))
            .onComplete(ctx.failing(t -> {
                // THEN the connection is refused
                ctx.verify(() -> {
                    assertThat(t).isInstanceOf(MqttConnectionException.class);
                    assertThat(((MqttConnectionException) t).code())
                            .isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
                });
                ctx.completeNow();
            }));
}
 
源代码11 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from unknown devices
 * for which neither registration information nor credentials are on record.
 *
 * @param ctx The test context
 */
@Test
public void testConnectFailsForNonExistingDevice(final VertxTestContext ctx) {

    // GIVEN an adapter
    // WHEN an unknown device tries to connect
    connectToAdapter(IntegrationTestSupport.getUsername("non-existing", Constants.DEFAULT_TENANT), "secret")
    .onComplete(ctx.failing(t -> {
        // THEN the connection is refused
        ctx.verify(() -> {
            assertThat(t).isInstanceOf(MqttConnectionException.class);
            assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        });
        ctx.completeNow();
    }));
}
 
源代码12 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from unknown devices
 * trying to authenticate using a client certificate but for which neither
 * registration information nor credentials are on record.
 *
 * @param ctx The test context
 */
@Test
public void testConnectX509FailsForNonExistingDevice(final VertxTestContext ctx) {

    // GIVEN an adapter
    // WHEN an unknown device tries to connect
    connectToAdapter(deviceCert)
    .onComplete(ctx.failing(t -> {
        // THEN the connection is refused
        ctx.verify(() -> {
            assertThat(t).isInstanceOf(MqttConnectionException.class);
            assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        });
        ctx.completeNow();
    }));
}
 
源代码13 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from devices
 * using wrong credentials.
 *
 * @param ctx The test context
 */
@Test
public void testConnectFailsForWrongCredentials(final VertxTestContext ctx) {

    // GIVEN a registered device
    final Tenant tenant = new Tenant();

    helper.registry
            .addDeviceForTenant(tenantId, tenant, deviceId, password)
    // WHEN the device tries to connect using a wrong password
    .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), "wrong password"))
    .onComplete(ctx.failing(t -> {
        // THEN the connection is refused
        ctx.verify(() -> {
            assertThat(t).isInstanceOf(MqttConnectionException.class);
            assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        });
        ctx.completeNow();
    }));
}
 
源代码14 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from devices belonging to a tenant for which the MQTT
 * adapter has been disabled.
 *
 * @param ctx The test context
 */
@Test
public void testConnectFailsForDisabledAdapter(final VertxTestContext ctx) {

    final Tenant tenant = new Tenant();
    tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));

    helper.registry
            .addDeviceForTenant(tenantId, tenant, deviceId, password)
            // WHEN a device that belongs to the tenant tries to connect to the adapter
            .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
            .onComplete(ctx.failing(t -> {
                // THEN the connection is refused with a NOT_AUTHORIZED code
                ctx.verify(() -> {
                    assertThat(t).isInstanceOf(MqttConnectionException.class);
                    assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
                });
                ctx.completeNow();
            }));
}
 
源代码15 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from devices
 * using a client certificate which belong to a tenant for which the
 * MQTT adapter has been disabled.
 *
 * @param ctx The test context
 */
@Test
public void testConnectX509FailsForDisabledAdapter(final VertxTestContext ctx) {
    helper.getCertificate(deviceCert.certificatePath())
    .compose(cert -> {
                final var tenant = Tenants.createTenantForTrustAnchor(cert);
                tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));
                return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
    })
    // WHEN a device that belongs to the tenant tries to connect to the adapter
    .compose(ok -> connectToAdapter(deviceCert))
    .onComplete(ctx.failing(t -> {
        // THEN the connection is refused with a NOT_AUTHORIZED code
        ctx.verify(() -> {
            assertThat(t).isInstanceOf(MqttConnectionException.class);
            assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
        });
        ctx.completeNow();
    }));
}
 
源代码16 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from devices that belong to a disabled tenant.
 *
 * @param ctx The test context
 */
@Test
public void testConnectFailsForDisabledTenant(final VertxTestContext ctx) {

    // Given a disabled tenant for which the MQTT adapter is enabled
    final Tenant tenant = new Tenant();
    tenant.setEnabled(false);

    helper.registry
        .addDeviceForTenant(tenantId, tenant, deviceId, password)
        .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
        .onComplete(ctx.failing(t -> {
            // THEN the connection is refused with a NOT_AUTHORIZED code
            ctx.verify(() -> {
                assertThat(t).isInstanceOf(MqttConnectionException.class);
                assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            });
            ctx.completeNow();
        }));
}
 
源代码17 项目: hono   文件: MqttConnectionIT.java

/**
 * Verifies that the adapter rejects connection attempts from devices
 * using a client certificate that belong to a disabled tenant.
 *
 * @param ctx The test context
 */
@Test
public void testConnectX509FailsForDisabledTenant(final VertxTestContext ctx) {

    // Given a disabled tenant for which the MQTT adapter is enabled
    helper.getCertificate(deviceCert.certificatePath())
    .compose(cert -> {
                final var tenant = Tenants.createTenantForTrustAnchor(cert);
                tenant.setEnabled(false);
                return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
    })
    .compose(ok -> connectToAdapter(deviceCert))
    .onComplete(ctx.failing(t -> {
        // THEN the connection is refused with a NOT_AUTHORIZED code
        ctx.verify(() -> {
            assertThat(t).isInstanceOf(MqttConnectionException.class);
            assertThat(((MqttConnectionException) t).code()).isEqualTo(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
        });
        ctx.completeNow();
    }));
}
 
源代码18 项目: lannister   文件: ConnectReceiver.java

private String generateClientId(ChannelHandlerContext ctx, boolean cleanSession) {
	if (cleanSession) {
		if (Settings.INSTANCE.getBoolean("mqttserver.acceptEmptyClientId", true)) {
			return "Lannister_"
					+ Long.toString(ClusterDataFactory.INSTANCE.createIdGenerator("clientIdGenerator").newId()); // [MQTT-3.1.3-6],[MQTT-3.1.3-7]
		}
		else {
			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
			return null;
		}
	}
	else {
		sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-8]
		return null;
	}
}
 
源代码19 项目: lannister   文件: ConnectReceiverTest.java

@Test
public void testCONNECTION_REFUSED_SERVER_UNAVAILABLE() throws Exception {
	ServiceChecker prev = Plugins.INSTANCE.put(ServiceChecker.class, new ServiceChecker() {
		@Override
		public Plugin clone() {
			return this;
		}

		@Override
		public boolean isServiceAvailable() {
			return false;
		}
	});

	MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);

	Assert.assertEquals(ret.variableHeader().connectReturnCode(),
			MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);

	Plugins.INSTANCE.put(ServiceChecker.class, prev);
}
 
源代码20 项目: lannister   文件: ConnectReceiverTest.java

@Test
public void testCONNECTION_REFUSED_NOT_AUTHORIZED() throws Exception {
	Authorizer prev = Plugins.INSTANCE.put(Authorizer.class, new Authorizer() {
		@Override
		public Plugin clone() {
			return this;
		}

		@Override
		public boolean isAuthorized(String clientId, String username) {
			return false;
		}
	});

	MqttConnAckMessage ret = executeNormalChannelRead0(TestUtil.newClientId(), true, null);

	Assert.assertEquals(ret.variableHeader().connectReturnCode(),
			MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);

	Plugins.INSTANCE.put(Authorizer.class, prev);
}
 
源代码21 项目: lannister   文件: SessionsTest.java

@Test
public void testLive() throws Exception {
	ConnectOptions options = new ConnectOptions();
	options.clientId(TestUtil.newClientId());

	MqttClient client = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
	MqttConnectReturnCode ret = client.connectOptions(options).connect();

	Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret);

	Assert.assertTrue(client.isConnected());

	HttpClient httpClient = new HttpClient(
			"http://localhost:" + Settings.INSTANCE.httpPort() + "/api/sessions?filter=live");
	HttpResponse res = httpClient.get();

	Assert.assertEquals(HttpResponseStatus.OK, res.status());
	Assert.assertEquals(new Integer(1), JsonPath.read(res.content().toString(CharsetUtil.UTF_8), "$.length()"));

	client.disconnect(true);

	Assert.assertFalse(client.isConnected());
}
 
源代码22 项目: lannister   文件: WillTest.java

@Test
public void testWillToNullOnNormalDisconnect() throws Exception {
	String willTopic = "will";
	String message = "ASTALAVISTA";

	String clientId = TestUtil.newClientId();
	ConnectOptions options = new ConnectOptions();
	options.clientId(clientId);
	options.will(
			new Message(-1, willTopic, null, message.getBytes(CharsetUtil.UTF_8), MqttQoS.AT_LEAST_ONCE, false));
	options.cleanSession(false);

	MqttClient client0 = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort());
	MqttConnectReturnCode ret = client0.connectOptions(options).connect();

	Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret);
	Assert.assertTrue(client0.isConnected());

	Assert.assertTrue(Session.NEXUS.get(clientId).will() != null
			&& Session.NEXUS.get(clientId).will().topicName().equals(willTopic));

	client0.disconnect(true);

	Thread.sleep(100);
	Assert.assertNull(Session.NEXUS.get(clientId).will());
}
 
源代码23 项目: vertx-mqtt   文件: MqttClientImpl.java

/**
 * Used for calling the connect handler when the server replies to the request
 *
 * @param msg  connection response message
 */
private void handleConnack(MqttConnAckMessage msg) {

  synchronized (this) {
    this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
  }

  Promise<MqttConnAckMessage> promise = connectPromise();
  if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
    promise.complete(msg);
  } else {
    MqttConnectionException exception = new MqttConnectionException(msg.code());
    log.error(String.format("Connection refused by the server - code: %s", msg.code()));
    promise.fail(exception);
  }
}
 

@Test
public void refusedBadUsernamePassword(TestContext context) {

  this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;

  try {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName("wrong_username");
    options.setPassword("wrong_password".toCharArray());
    MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
    client.connect(options);
    context.fail();
  } catch (MqttException e) {
    context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_FAILED_AUTHENTICATION);
  }
}
 

@Test
public void refusedUnacceptableProtocolVersion(TestContext context) {

  this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;

  try {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions options = new MqttConnectOptions();
    // trying the old 3.1
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
    MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
    client.connect(options);
    context.fail();
  } catch (MqttException e) {
    context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_INVALID_PROTOCOL_VERSION);
  }
}
 

@Test
public void connectionAlreadyAccepted(TestContext context) throws Exception {

  this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_ACCEPTED;

  MemoryPersistence persistence = new MemoryPersistence();
  MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "12345", persistence);
  client.connect();

  try {
    // try to accept a connection already accepted
    this.endpoint.accept(false);
    context.fail();
  } catch (IllegalStateException e) {
    // Ok
  }
}
 

@Test
public void refusedClientIdZeroBytes(TestContext context) {

  this.expectedReturnCode = MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;

  try {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false);
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    MqttClient client = new MqttClient(String.format("tcp://%s:%d", MQTT_SERVER_HOST, MQTT_SERVER_PORT), "", persistence);
    client.connect(options);
    context.fail();
  } catch (MqttException e) {
    context.assertTrue(e.getReasonCode() == MqttException.REASON_CODE_INVALID_CLIENT_ID);
    context.assertNotNull(rejection);
  }
}
 

@Override
public void reject(MqttConnectReturnCode code) {
    if (closed) {
        return;
    }
    endpoint.reject(code);
    complete();
}
 
源代码29 项目: WeEvent   文件: Connect.java

public MqttMessage processConnect(MqttConnectMessage msg, SessionContext sessionData) {
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);

    String clientId = sessionData.getClientId();
    if (StringUtils.isBlank(clientId)) {
        log.error("clientId is empty, reject");
        return MqttMessageFactory.newMessage(fixedHeader,
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
    }

    // verify userName and password
    String username = msg.payload().userName();
    String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), StandardCharsets.UTF_8);
    if (!this.authService.verifyUserName(username, password)) {
        log.error("verify account failed, reject");
        return MqttMessageFactory.newMessage(fixedHeader,
                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
    }

    if (this.sessionStore.existSession(clientId)) {
        log.info("exist client id, force to delete the older");
        this.sessionStore.removeSession(clientId);
    }

    // store new session
    this.sessionStore.addSession(clientId, sessionData);

    log.info("MQTT connected, clientId: {}", clientId);
    return MqttMessageFactory.newMessage(fixedHeader,
            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null);
}
 

/**
 *  P - B, S - B
 * @param channel
 * @param code
 * @param sessionPresent
 * @param isCLose
 */
private void writeBackConnect(Channel channel, MqttConnectReturnCode code, Boolean sessionPresent,
		boolean isCLose) {
	this.sendProcess.sendBackConnect(channel, code, false);
	if (isCLose) {
		channel.close();
	}
}
 
 类所在包
 同包方法