类io.netty.channel.ChannelOutboundHandlerAdapter源码实例Demo

下面列出了怎么用io.netty.channel.ChannelOutboundHandlerAdapter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: netty-4.1.22   文件: EmbeddedChannelTest.java
@Test
public void testFlushOutbound() throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);
    EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            latch.countDown();
        }
    });

    channel.flushOutbound();

    if (!latch.await(1L, TimeUnit.SECONDS)) {
        fail("Nobody called #flush() in time.");
    }
}
 
public WriteStreamSubscriberFutureListenersTest() {
    listeners = new LinkedBlockingQueue<>();
    channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
                throws Exception {
            ChannelFutureListener listener = mock(ChannelFutureListener.class);
            listeners.add(listener);
            promise.addListener(listener);
            super.write(ctx, msg, promise);
        }
    });
    WriteDemandEstimator estimator = WriteDemandEstimators.newDefaultEstimator();
    TestCompletableSubscriber completableSubscriber = new TestCompletableSubscriber();
    subscriber = new WriteStreamSubscriber(channel, estimator, completableSubscriber,
            UNSUPPORTED_PROTOCOL_CLOSE_HANDLER);
    TestSubscription subscription = new TestSubscription();
    subscriber.onSubscribe(subscription);
    assertThat("No items requested.", subscription.requested(), greaterThan(0L));
}
 
源代码3 项目: Cleanstone   文件: ServerChannelInitializer.java
@Override
protected void initChannel(Channel channel) {
    // inbound
    channel.pipeline().addLast("identificationHandler", new IdentificationHandler(nettyNetworking));
    channel.pipeline().addLast("encryptionDecoder", new ChannelInboundHandlerAdapter());
    channel.pipeline().addLast("byteStreamDecoder", new ByteStreamDecoder());
    channel.pipeline().addLast("compressionDecoder", new ChannelInboundHandlerAdapter());
    channel.pipeline().addLast("packetDataDecoder", new PacketDataDecoder(nettyNetworking.getProtocol()));
    channel.pipeline().addLast("inboundPacketHandler", new InboundPacketHandler(nettyNetworking));
    // outbound
    channel.pipeline().addFirst("outboundPacketHandler", new OutboundPacketHandler(nettyNetworking));
    channel.pipeline().addFirst("packetEncoder", new PacketEncoder(nettyNetworking.getProtocol()));
    channel.pipeline().addFirst("compressionEncoder", new ChannelOutboundHandlerAdapter());
    channel.pipeline().addFirst("byteStreamEncoder", new ByteStreamEncoder());
    channel.pipeline().addFirst("encryptionEncoder", new ChannelOutboundHandlerAdapter());
}
 
源代码4 项目: hono   文件: CommandAndControlMqttIT.java
private Future<Void> injectMqttClientPubAckBlocker(final AtomicBoolean outboundPubAckBlocked) {
    // The vert.x MqttClient automatically sends a PubAck after having received a Qos 1 Publish message,
    // as of now, there is no configuration option to prevent this (see https://github.com/vert-x3/vertx-mqtt/issues/120).
    // Therefore the underlying NetSocket pipeline is used here to filter out the outbound PubAck messages.
    try {
        final Method connectionMethod = MqttClientImpl.class.getDeclaredMethod("connection");
        connectionMethod.setAccessible(true);
        final NetSocketInternal connection = (NetSocketInternal) connectionMethod.invoke(mqttClient);
        connection.channelHandlerContext().pipeline().addBefore("handler", "OutboundPubAckBlocker",
                new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
                    throws Exception {
                if (outboundPubAckBlocked.get() && msg instanceof io.netty.handler.codec.mqtt.MqttPubAckMessage) {
                    LOGGER.debug("suppressing PubAck, message id: {}", ((MqttPubAckMessage) msg).variableHeader().messageId());
                } else {
                    super.write(ctx, msg, promise);
                }
            }
        });
        return Future.succeededFuture();
    } catch (final Exception e) {
        LOGGER.error("failed to inject PubAck blocking handler");
        return Future.failedFuture(new Exception("failed to inject PubAck blocking handler", e));
    }
}
 
private void doTestSslEngineClosed(HttpClient client, AtomicInteger closeCount, Class<? extends Throwable> expectedExc, String expectedMsg) {
	Mono<String> response =
			client.doOnChannelInit(
			            (o, c, address) ->
			                c.pipeline()
			                 .addFirst(new ChannelOutboundHandlerAdapter() {

			                     @Override
			                     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
			                             SocketAddress localAddress, ChannelPromise promise) throws Exception {
			                         super.connect(ctx, remoteAddress, localAddress,
					                         new TestPromise(ctx.channel(), promise, closeCount));
			                     }
			                 }))
			      .get()
			      .uri("/")
			      .responseContent()
			      .aggregate()
			      .asString();

	StepVerifier.create(response)
	            .expectErrorMatches(t -> t.getClass().isAssignableFrom(expectedExc) && t.getMessage().startsWith(expectedMsg))
	            .verify(Duration.ofSeconds(30));
}
 
源代码6 项目: grpc-java   文件: TsiFrameHandlerTest.java
@Test
public void flushAfterCloseShouldWork() throws InterruptedException {
  ByteBuf msg = Unpooled.copiedBuffer("message after handshake failed", CharsetUtil.UTF_8);
  channel.write(msg);

  channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
      // We have to call flush while doing a close, since close() tears down the pipeline
      // immediately after.
      channel.flush();
      super.close(ctx, promise);
    }
  });

  assertThat(channel.outboundMessages()).isEmpty();

  channel.close().sync();
  Object actual = channel.readOutbound();

  assertWithMessage("pending write should be flushed on close").that(actual).isEqualTo(msg);
  channel.checkException();
}
 
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_timeouts_failure() throws Exception {

    final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);

    final CountDownLatch subackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
                subackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });

    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);

    channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));

    while (subackLatch.getCount() != 0) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }
    assertTrue(subackLatch.await(5, TimeUnit.SECONDS));
}
 
@Test(timeout = 5000)
public void test_read_subscribe_extension_null() throws Exception {

    final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);

    final CountDownLatch subackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.GRANTED_QOS_1)) {
                subackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });

    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(null);

    channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));

    while (messageAtomicReference.get() == null) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }

    final SUBSCRIBE message = (SUBSCRIBE) messageAtomicReference.get();
    assertEquals("topic", message.getTopics().get(0).getTopic());
}
 
@Test(timeout = 5000)
public void test_read_publish_context_has_interceptors_timeouts_failure_mqtt5_success_ack() throws Exception {

    final ClientContextImpl clientContext =
            new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<PublishInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addPublishInboundInterceptor(isolatedInterceptors.get(4));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);

    final CountDownLatch pubackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof PUBACK) {
                pubackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });

    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);

    channel.writeInbound(TestMessageUtil.createMqtt3Publish("topic", "payload".getBytes(), QoS.AT_LEAST_ONCE));

    while (dropLatch.getCount() != 0) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }

    assertTrue(dropLatch.await(5, TimeUnit.SECONDS));
    assertTrue(pubackLatch.await(5, TimeUnit.SECONDS));
}
 
源代码10 项目: servicetalk   文件: DefaultNettyConnectionTest.java
private static ChannelInitializer trailerProtocolEndEventEmitter() {
    return ch -> ch.pipeline()
            .addLast(new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(final ChannelHandlerContext ctx,
                                  final Object msg,
                                  final ChannelPromise promise) {
                    if (TRAILER.equals(msg)) {
                        ctx.pipeline().fireUserEventTriggered(CloseHandler.ProtocolPayloadEndEvent.OUTBOUND);
                    }
                    ctx.write(msg, promise);
                }
            });
}
 
源代码11 项目: servicetalk   文件: NettyChannelPublisherTest.java
public void setUp(Predicate<Integer> terminalPredicate) throws Exception {
    channel = new EmbeddedChannel();
    NettyConnection<Integer, Object> connection =
            DefaultNettyConnection.<Integer, Object>initChannel(channel, DEFAULT_ALLOCATOR,
        immediate(), terminalPredicate, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, defaultFlushStrategy(), null, channel ->
                            channel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
            @Override
            public void read(ChannelHandlerContext ctx) throws Exception {
                readRequested = true;
                super.read(ctx);
            }
        }), OFFLOAD_ALL_STRATEGY, mock(Protocol.class)).toFuture().get();
    publisher = connection.read();
    channel.config().setAutoRead(false);
}
 
源代码12 项目: servicetalk   文件: UdpReporter.java
private static Bootstrap buildBootstrap(EventLoopGroup group, Codec codec, SocketAddress collectorAddress,
                                        @Nullable String loggerName) {
    if (!(collectorAddress instanceof InetSocketAddress)) {
        throw new IllegalArgumentException("collectorAddress " + collectorAddress +
                " is invalid for UDP");
    }
    return new Bootstrap()
            .group(group)
            .channel(datagramChannel(group))
            .option(RCVBUF_ALLOCATOR, DEFAULT_RECV_BUF_ALLOCATOR)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(final Channel ch) {
                    if (loggerName != null) {
                        ch.pipeline().addLast(new LoggingHandler(loggerName, LogLevel.TRACE));
                    }
                    ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                            if (msg instanceof Span) {
                                byte[] bytes = codec.spanBytesEncoder().encode((Span) msg);
                                ByteBuf buf = ctx.alloc().buffer(bytes.length).writeBytes(bytes);
                                ctx.write(new DatagramPacket(buf, (InetSocketAddress) collectorAddress), promise);
                            } else {
                                ctx.write(msg, promise);
                            }
                        }
                    });
                }
            });
}
 
源代码13 项目: Cleanstone   文件: NettyConnection.java
@Override
public void setEncryptionEnabled(boolean encryptionEnabled) {
    if (isClosed()) return;
    super.setEncryptionEnabled(encryptionEnabled);
    channel.pipeline().replace("encryptionEncoder", "encryptionEncoder",
            encryptionEnabled ? new EncryptionEncoder() : new ChannelOutboundHandlerAdapter());
    channel.pipeline().replace("encryptionDecoder", "encryptionDecoder",
            encryptionEnabled ? new EncryptionDecoder() : new ChannelInboundHandlerAdapter());
}
 
源代码14 项目: Cleanstone   文件: NettyConnection.java
@Override
public void setCompressionEnabled(boolean compressionEnabled) {
    if (isClosed()) return;
    super.setCompressionEnabled(compressionEnabled);
    channel.pipeline().replace("compressionEncoder", "compressionEncoder",
            compressionEnabled ? new JdkZlibEncoder() : new ChannelOutboundHandlerAdapter());
    channel.pipeline().replace("compressionDecoder", "compressionDecoder",
            compressionEnabled ? new JdkZlibDecoder() : new ChannelInboundHandlerAdapter());
}
 
private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) {
    if (protocol == StatsdProtocol.UDP) {
        await().until(() -> meterRegistry.client.get() != null);
        ((Connection) meterRegistry.client.get())
                .addHandler(new LoggingHandler("udpclient", LogLevel.INFO))
                .addHandler(new ChannelOutboundHandlerAdapter() {
                    @Override
                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        writeCount.incrementAndGet();
                        super.write(ctx, msg, promise);
                    }
                });
    }
}
 
源代码16 项目: riposte   文件: VerifyCornerCasesComponentTest.java
@Test
public void invalid_http_call_that_causes_Netty_DecoderFailure_should_result_in_expected_400_error() throws Exception {
    // given
    // Normal request, but fiddle with the first chunk as it's going out to remove the HTTP version and make it an
    //      invalid HTTP call. This will cause Netty to mark the HttpRequest with a DecoderFailure.
    NettyHttpClientRequestBuilder request = request()
        .withMethod(HttpMethod.GET)
        .withUri(BasicEndpoint.MATCHING_PATH)
        .withPipelineAdjuster(
            p -> p.addFirst(new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    String msgAsString = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);

                    if (msgAsString.contains("HTTP/1.1")) {
                        msg = Unpooled.copiedBuffer(msgAsString.replace("HTTP/1.1", ""), CharsetUtil.UTF_8);
                    }
                    super.write(ctx, msg, promise);
                }
            })
        );

    // when
    NettyHttpClientResponse response = request.execute(downstreamServerConfig.endpointsPort(), 3000);

    // then
    verifyErrorReceived(response.payload,
                        response.statusCode,
                        new ApiErrorWithMetadata(SampleCoreApiError.MALFORMED_REQUEST,
                                                 Pair.of("cause", "Invalid HTTP request"))
    );
}
 
源代码17 项目: onos   文件: XmppChannelHandler.java
public XmppChannelHandler(XmppDeviceFactory deviceFactory) {
    ChannelInboundHandlerAdapter inboundHandlerAdapter = new ChannelInboundHandlerAdapter();
    ChannelOutboundHandlerAdapter outboundHandlerAdapter = new ChannelOutboundHandlerAdapter();
    this.init(inboundHandlerAdapter, outboundHandlerAdapter);
    this.state = ChannelState.IDLE;
    this.deviceFactory = deviceFactory;
}
 
源代码18 项目: zuul   文件: SslHandshakeInfoHandlerTest.java
@Test
public void sslEarlyHandshakeFailure() throws Exception {
    EmbeddedChannel clientChannel = new EmbeddedChannel();
    SSLEngine clientEngine = SslContextBuilder.forClient().build().newEngine(clientChannel.alloc());
    clientChannel.pipeline().addLast(new SslHandler(clientEngine));

    EmbeddedChannel serverChannel = new EmbeddedChannel();
    SelfSignedCertificate cert = new SelfSignedCertificate("localhorse");
    SSLEngine serverEngine = SslContextBuilder.forServer(cert.key(), cert.cert()).build()
            .newEngine(serverChannel.alloc());

    serverChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            // Simulate an early closure form the client.
            ReferenceCountUtil.safeRelease(msg);
            promise.setFailure(new ClosedChannelException());
        }
    });
    serverChannel.pipeline().addLast(new SslHandler(serverEngine));
    serverChannel.pipeline().addLast(new SslHandshakeInfoHandler());

    Object clientHello = clientChannel.readOutbound();
    assertNotNull(clientHello);
    ReferenceCountUtil.retain(clientHello);

    serverChannel.writeInbound(clientHello);

    // Assert that the handler removes itself from the pipeline, since it was torn down.
    assertNull(serverChannel.pipeline().context(SslHandshakeInfoHandler.class));
}
 
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_throws_exception() throws Exception {

    final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(1));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);

    final CountDownLatch subackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
                subackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });

    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);

    channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));

    while (subackLatch.getCount() != 0) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }

    assertTrue(subackLatch.await(5, TimeUnit.SECONDS));

}
 
@Test(timeout = 5000)
public void test_read_subscribe_context_has_interceptors_timeouts_failure_mqtt3() throws Exception {

    final ClientContextImpl clientContext = new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<SubscribeInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addSubscribeInboundInterceptor(isolatedInterceptors.get(2));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv3_1_1);

    final CountDownLatch subackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof SUBACK && ((SUBACK) msg).getReasonCodes().get(0).equals(Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR)) {
                subackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });


    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);

    channel.writeInbound(new SUBSCRIBE(1, new Topic("topic", QoS.AT_LEAST_ONCE, true, true, Mqtt5RetainHandling.SEND, 1)));

    while (subackLatch.getCount() != 0) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }

    assertTrue(subackLatch.await(5, TimeUnit.SECONDS));
}
 
@Test(timeout = 5000)
public void test_read_publish_context_has_interceptors_timeouts_failure_mqtt3_success_ack() throws Exception {

    final ClientContextImpl clientContext =
            new ClientContextImpl(hiveMQExtensions, new ModifiableDefaultPermissionsImpl());

    final List<PublishInboundInterceptor> isolatedInterceptors = getIsolatedInterceptor();

    clientContext.addPublishInboundInterceptor(isolatedInterceptors.get(4));

    channel.attr(ChannelAttributes.PLUGIN_CLIENT_CONTEXT).set(clientContext);
    channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv3_1_1);

    final CountDownLatch pubackLatch = new CountDownLatch(1);

    channel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {

            if (msg instanceof PUBACK) {
                pubackLatch.countDown();
            }

            super.write(ctx, msg, promise);
        }
    });


    when(hiveMQExtensions.getExtensionForClassloader(any(IsolatedPluginClassloader.class))).thenReturn(plugin);

    channel.writeInbound(TestMessageUtil.createMqtt3Publish("topic", "payload".getBytes(), QoS.AT_LEAST_ONCE));

    while (dropLatch.getCount() != 0) {
        channel.runPendingTasks();
        channel.runScheduledPendingTasks();
    }

    assertTrue(dropLatch.await(5, TimeUnit.SECONDS));
    assertTrue(pubackLatch.await(5, TimeUnit.SECONDS));
}
 
 类所在包
 类方法
 同包方法