类io.netty.channel.ChannelPromise源码实例Demo

下面列出了怎么用io.netty.channel.ChannelPromise的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: krpc   文件: TransportBase.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    if (msg instanceof RpcData) {
        RpcData data = (RpcData) msg;

        String connId = getConnId(ctx);
        String key = getKey(connId);
        boolean enc = false;
        if( key != null && isRequest(data.getMeta()) ) {
            ReflectionUtils.updateEncrypt(data.getMeta(),1);
            enc = true;
        }

        int size = enc ? codec.getSize(data) + 16 : codec.getSize(data);
        ByteBuf out = ctx.alloc().buffer(size);

        codec.encode(data, out, key);
        ctx.writeAndFlush(out, promise);
    } else {
        super.write(ctx, msg, promise);
    }
}
 
@Test
public void test_inflight_messages() throws NoMessageIdAvailableException {
    when(messageIDPool.takeIfAvailable(1)).thenReturn(1);
    when(messageIDPool.takeIfAvailable(2)).thenReturn(2);
    when(clientQueuePersistence.readInflight(eq("client"), anyLong(), anyInt()))
            .thenReturn(Futures.immediateFuture(ImmutableList.of(createPublish(1), new PUBREL(2))));

    when(channel.isActive()).thenReturn(true);
    when(channel.newPromise()).thenReturn(mock(ChannelPromise.class));
    when(channel.attr(ChannelAttributes.IN_FLIGHT_MESSAGES)).thenReturn(new TestChannelAttribute<>(new AtomicInteger(0)));

    publishPollService.pollInflightMessages("client", channel);

    verify(messageIDPool, times(2)).takeIfAvailable(anyInt());
    verify(pipeline, times(1)).fireUserEventTriggered(any(PUBLISH.class));
    verify(channelInactiveHandler, times(1)).addCallback(anyString(), any(ChannelInactiveHandler.ChannelInactiveCallback.class));
    verify(channel).writeAndFlush(any(PubrelWithFuture.class));
}
 
源代码3 项目: grpc-nebula-java   文件: NettyServerHandler.java
/**
 * Handler for commands sent from the stream.
 */
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
    throws Exception {
  if (msg instanceof SendGrpcFrameCommand) {
    sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
  } else if (msg instanceof SendResponseHeadersCommand) {
    sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
  } else if (msg instanceof CancelServerStreamCommand) {
    cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
  } else if (msg instanceof ForcefulCloseCommand) {
    forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
  } else {
    AssertionError e =
        new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
    ReferenceCountUtil.release(msg);
    promise.setFailure(e);
    throw e;
  }
}
 
源代码4 项目: zuul   文件: AccessLogChannelHandler.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
    RequestState state = ctx.channel().attr(ATTR_REQ_STATE).get();

    if (msg instanceof HttpResponse) {
        state.response = (HttpResponse) msg;
        state.responseBodySize = 0;
    }

    if (msg instanceof HttpContent) {
        state.responseBodySize += ((HttpContent) msg).content().readableBytes();
    }

    super.write(ctx, msg, promise);
}
 
源代码5 项目: netty-4.1.22   文件: NioSocketChannel.java
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
                                 ChannelFuture shutdownInputFuture,
                                 ChannelPromise promise) {
    Throwable shutdownOutputCause = shutdownOutputFuture.cause();
    Throwable shutdownInputCause = shutdownInputFuture.cause();
    if (shutdownOutputCause != null) {
        if (shutdownInputCause != null) {
            logger.debug("Exception suppressed because a previous exception occurred.",
                    shutdownInputCause);
        }
        promise.setFailure(shutdownOutputCause);
    } else if (shutdownInputCause != null) {
        promise.setFailure(shutdownInputCause);
    } else {
        promise.setSuccess();
    }
}
 
源代码6 项目: PingAPI   文件: DuplexHandler.java
/**
 * The write() method sends packets to the client
 * It needs to be overrode in order to listen for outgoing packets
 */
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
	if(msg instanceof PacketStatusOutServerInfo) {
		PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
		PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
		this.event = new PingEvent(reply);
		for(PingListener listener : PingAPI.getListeners()) {
			listener.onPing(event);
		}
		if(!this.event.isCancelled()) {
			super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
		}
		return;
	}
	else if(msg instanceof PacketStatusOutPong) {
		if(this.event != null && this.event.isPongCancelled()) {
			return;
		}
	}
	super.write(ctx, msg, promise);
}
 
源代码7 项目: arcusplatform   文件: AbstractBootstrap.java
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
 
源代码8 项目: netty-4.1.22   文件: NioSctpServerChannel.java
@Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
    if (eventLoop().inEventLoop()) {
        try {
            javaChannel().bindAddress(localAddress);
            promise.setSuccess();
        } catch (Throwable t) {
            promise.setFailure(t);
        }
    } else {
        eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                bindAddress(localAddress, promise);
            }
        });
    }
    return promise;
}
 
源代码9 项目: PingAPI   文件: DuplexHandler.java
/**
 * The write() method sends packets to the client
 * It needs to be overrode in order to listen for outgoing packets
 */
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
	if(msg instanceof PacketStatusOutServerInfo) {
		PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
		PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
		this.event = new PingEvent(reply);
		for(PingListener listener : PingAPI.getListeners()) {
			listener.onPing(event);
		}
		if(!this.event.isCancelled()) {
			super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
		}
		return;
	}
	else if(msg instanceof PacketStatusOutPong) {
		if(this.event != null && this.event.isPongCancelled()) {
			return;
		}
	}
	super.write(ctx, msg, promise);
}
 
源代码10 项目: netty-4.1.22   文件: AbstractEpollStreamChannel.java
/**
 * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
 * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
 * splice until the {@link ChannelFuture} was canceled or it was failed.
 *
 * Please note:
 * <ul>
 *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
 *   {@link IllegalArgumentException} is thrown. </li>
 *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
 *   target {@link AbstractEpollStreamChannel}</li>
 * </ul>
 *
 */
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
                                    final ChannelPromise promise) {
    if (ch.eventLoop() != eventLoop()) {
        throw new IllegalArgumentException("EventLoops are not the same.");
    }
    if (len < 0) {
        throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
    }
    if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
            || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
        throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
    }
    checkNotNull(promise, "promise");
    if (!isOpen()) {
        promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
    } else {
        addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
        failSpliceIfClosed(promise);
    }
    return promise;
}
 
源代码11 项目: bazel   文件: HttpUploadHandlerTest.java
private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status)
    throws Exception {
  ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
  ChannelPromise writePromise = ch.newPromise();
  ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise);

  HttpRequest request = ch.readOutbound();
  assertThat(request.method()).isEqualTo(HttpMethod.PUT);
  assertThat(request.headers().get(HttpHeaders.CONNECTION))
      .isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString());

  HttpChunkedInput content = ch.readOutbound();
  assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5);

  FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
  response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

  ch.writeInbound(response);

  assertThat(writePromise.isDone()).isTrue();
  assertThat(ch.isOpen()).isTrue();
}
 
源代码12 项目: PingAPI   文件: DuplexHandler.java
/**
 * The write() method sends packets to the client
 * It needs to be overrode in order to listen for outgoing packets
 */
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
	if(msg instanceof PacketStatusOutServerInfo) {
		PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
		PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
		this.event = new PingEvent(reply);
		for(PingListener listener : PingAPI.getListeners()) {
			listener.onPing(event);
		}
		if(!this.event.isCancelled()) {
			super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
		}
		return;
	}
	else if(msg instanceof PacketStatusOutPong) {
		if(this.event != null && this.event.isPongCancelled()) {
			return;
		}
	}
	super.write(ctx, msg, promise);
}
 
源代码13 项目: qpid-jms   文件: NettyServer.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    LOG.trace("NettyServerHandler: Channel write: {}", msg);
    if (isWebSocketServer() && msg instanceof ByteBuf) {
        if (isFragmentWrites()) {
            ByteBuf orig = (ByteBuf) msg;
            int origIndex = orig.readerIndex();
            int split = orig.readableBytes()/2;

            ByteBuf part1 = orig.copy(origIndex, split);
            LOG.trace("NettyServerHandler: Part1: {}", part1);
            orig.readerIndex(origIndex + split);
            LOG.trace("NettyServerHandler: Part2: {}", orig);

            BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
            ctx.writeAndFlush(frame1);
            ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
            ctx.write(frame2, promise);
        } else {
            BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
            ctx.write(frame, promise);
        }
    } else {
        ctx.write(msg, promise);
    }
}
 
源代码14 项目: ipmi4j   文件: IpmiCodec.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    try {
        ctx.write(encode(ctx, (Packet) msg), promise);
        // LOG.info("Write: OK");
    } catch (Throwable e) {
        // LOG.info("Write: " + e);
        // https://github.com/netty/netty/issues/3060 - exception not reported by pipeline.
        promise.tryFailure(e);
    } finally {
        // It isn't, but it might become so?
        ReferenceCountUtil.release(msg);
    }
}
 
private void infoHeadersAndTrailersWithData(boolean eos, int infoHeaderCount) {
    writeAllFlowControlledFrames();
    final int streamId = 6;
    Http2Headers infoHeaders = informationalHeaders();
    for (int i = 0; i < infoHeaderCount; ++i) {
        encoder.writeHeaders(ctx, streamId, infoHeaders, 0, false, newPromise());
    }

    Http2Stream stream = connection.stream(streamId);
    when(remoteFlow.hasFlowControlled(eq(stream))).thenReturn(true);

    ChannelPromise promise2 = newPromise();
    encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise2);

    ChannelPromise promise3 = newPromise();
    ChannelFuture future = encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, eos, promise3);
    assertTrue(future.isDone());
    assertEquals(eos, future.isSuccess());

    verify(writer, times(infoHeaderCount)).writeHeaders(eq(ctx), eq(streamId), eq(infoHeaders), eq(0),
            eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), any(ChannelPromise.class));
    verify(writer, times(1)).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
            eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise2));
    if (eos) {
        verify(writer, times(1)).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
                eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise3));
    }
}
 
源代码16 项目: netty4.0.27Learn   文件: LoggingHandler.java
@Override
public void disconnect(ChannelHandlerContext ctx,
        ChannelPromise promise) throws Exception {
    if (logger.isEnabled(internalLevel)) {
        logger.log(internalLevel, format(ctx, "DISCONNECT()"));
    }
    super.disconnect(ctx, promise);
}
 
源代码17 项目: hxy-socket   文件: MsgOutboundHandler.java
private void wsWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    if(msg instanceof String){
        ctx.write(new TextWebSocketFrame((String) msg), promise);
        return;
    }
    if (msg instanceof byte[]) {
        ctx.write(new BinaryWebSocketFrame(Unpooled.wrappedBuffer((byte[]) msg)), promise);
        return;
    }
    ctx.write(msg, promise);
}
 
源代码18 项目: openzaly   文件: MessageEncoder.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
	promise.addListener(new GenericFutureListener<Future<? super Void>>() {

		public void operationComplete(Future<? super Void> future) throws Exception {
			if (!future.isSuccess()) {
				logger.error("write data to client fail ", future.cause());
			}
		}
	});

	super.write(ctx, msg, promise);
}
 
@Override
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) {
  if (!(message instanceof HttpRequest)) {
    context.write(message, promise);
    return;
  }

  final HttpRequest request = (HttpRequest)message;
  final Tracer tracer = GlobalTracer.get();
  final SpanBuilder builder = tracer
    .buildSpan(request.method().name())
    .withTag(Tags.COMPONENT, "netty")
    .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CLIENT)
    .withTag(Tags.HTTP_METHOD, request.method().name())
    .withTag(Tags.HTTP_URL, request.uri());

  final SpanContext parentContext = tracer.extract(Builtin.HTTP_HEADERS, new NettyExtractAdapter(request.headers()));

  if (parentContext != null)
    builder.asChildOf(parentContext);

  final Span span = builder.start();
  try (final Scope scope = tracer.activateSpan(span)) {
    // AWS calls are often signed, so we can't add headers without breaking
    // the signature.
    if (!request.headers().contains("amz-sdk-invocation-id")) {
      tracer.inject(span.context(), Builtin.HTTP_HEADERS, new NettyInjectAdapter(request.headers()));
    }

    context.channel().attr(TracingClientChannelInboundHandlerAdapter.CLIENT_ATTRIBUTE_KEY).set(span);
    try {
      context.write(message, promise);
    }
    catch (final Throwable t) {
      OpenTracingApiUtil.setErrorTag(span, t);
      span.finish();
      throw t;
    }
  }
}
 
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) throws Exception {
    originalPromise = promise;
    ChannelPromise downPromise = ctx.newPromise();
    downPromise.addListener(new GenericFutureListener<Future<Void>>() {
        @Override
        public void operationComplete(Future<Void> future) throws Exception {
            if (!future.isSuccess() && !originalPromise.isDone()) {
                originalPromise.setFailure(future.cause());
            }
        }
    });
    ctx.connect(remoteAddress, localAddress, downPromise);
}
 
源代码21 项目: netty-4.1.22   文件: Http2MultiplexCodec.java
@Override
public void deregister(ChannelPromise promise) {
    if (!promise.setUncancellable()) {
        return;
    }
    if (registered) {
        registered = true;
        promise.setSuccess();
        pipeline().fireChannelUnregistered();
    } else {
        promise.setFailure(new IllegalStateException("Not registered"));
    }
}
 
@Test
public void settingsWriteShouldNotUpdateSettings() throws Exception {
    Http2Settings settings = new Http2Settings();
    settings.initialWindowSize(100);
    settings.maxConcurrentStreams(1000);
    settings.headerTableSize(2000);

    ChannelPromise promise = newPromise();
    encoder.writeSettings(ctx, settings, promise);
    verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
}
 
源代码23 项目: servicetalk   文件: AbstractH2DuplexHandler.java
final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // For H2 we don't need to notify protocolPayloadEndOutboundSuccess(ctx); the codecs takes care of half-closure
    closeHandler.protocolPayloadEndOutbound(ctx);
    HttpHeaders h1Headers = (HttpHeaders) msg;
    Http2Headers h2Headers = h1HeadersToH2Headers(h1Headers);
    if (h2Headers.isEmpty()) {
        ctx.write(new DefaultHttp2DataFrame(EMPTY_BUFFER, true), promise);
    } else {
        ctx.write(new DefaultHttp2HeadersFrame(h2Headers, true), promise);
    }
}
 
@Override
public final ChannelFuture close(ChannelPromise promise) {
    try {
        channel().close(promise);
    } catch (Exception e) {
        promise.setFailure(e);
        handleException(e);
    }
    return promise;
}
 
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
    if (promise == null) {
        // Closed via cancellation and the promise has been notified already.
        return;
    }

    // Use tryFailure() instead of setFailure() to avoid the race against cancel().
    promise.tryFailure(cause);
    closeIfClosed();
}
 
源代码26 项目: grpc-java   文件: NettyClientHandlerTest.java
@Test
public void cancelTwiceDifferentReasons() throws Exception {
  createStream();

  cancelStream(Status.DEADLINE_EXCEEDED);

  verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()),
      any(ChannelPromise.class));

  ChannelFuture future = cancelStream(Status.CANCELLED);
  assertTrue(future.isSuccess());
}
 
源代码27 项目: Sentinel   文件: NettyTransportClient.java
@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
    if (!isReady()) {
        throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
    }
    if (!validRequest(request)) {
        throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
    }
    int xid = getCurrentId();
    try {
        request.setId(xid);

        channel.writeAndFlush(request);

        ChannelPromise promise = channel.newPromise();
        TokenClientPromiseHolder.putPromise(xid, promise);

        if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
            throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
        }

        SimpleEntry<ChannelPromise, ClusterResponse> entry = TokenClientPromiseHolder.getEntry(xid);
        if (entry == null || entry.getValue() == null) {
            // Should not go through here.
            throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
        }
        return entry.getValue();
    } finally {
        TokenClientPromiseHolder.remove(xid);
    }
}
 
源代码28 项目: drift   文件: ResponseOrderingHandler.java
@Override
public void write(ChannelHandlerContext context, Object message, ChannelPromise promise)
{
    if (message instanceof ThriftFrame) {
        // always re-enable auto read
        context.channel().config().setAutoRead(true);
    }
    context.write(message, promise);
}
 
源代码29 项目: reactor-netty   文件: AccessLogHandlerH2.java
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
	boolean lastContent = false;
	if (msg instanceof Http2HeadersFrame) {
		final Http2HeadersFrame responseHeaders = (Http2HeadersFrame) msg;
		final Http2Headers headers = responseHeaders.headers();
		lastContent = responseHeaders.isEndStream();

		accessLog.status(headers.status())
		         .chunked(true);
	}
	if (msg instanceof Http2DataFrame) {
		final Http2DataFrame data = (Http2DataFrame) msg;
		lastContent = data.isEndStream();

		accessLog.increaseContentLength(data.content().readableBytes());
	}
	if (lastContent) {
		ctx.write(msg, promise.unvoid())
		   .addListener(future -> {
		       if (future.isSuccess()) {
		           accessLog.log();
		       }
		   });
		return;
	}
	//"FutureReturnValueIgnored" this is deliberate
	ctx.write(msg, promise);
}
 
@Test(timeout = 4_000)
public void test_remove_messages() throws Exception {
    InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE = 1;

    final PUBLISH publish1 = createPublish("topic", 1, QoS.AT_LEAST_ONCE);
    final PUBLISH publish2 = createPublish("topic", 2, QoS.AT_LEAST_ONCE);
    final PUBLISH publish3 = createPublish("topic", 3, QoS.AT_LEAST_ONCE);
    final PUBLISH publish4 = createPublish("topic", 4, QoS.AT_LEAST_ONCE);

    final ChannelPromise promise1 = channel.newPromise();
    final ChannelPromise promise2 = channel.newPromise();
    final ChannelPromise promise3 = channel.newPromise();
    final ChannelPromise promise4 = channel.newPromise();

    channel.writeAndFlush(publish1, promise1);
    channel.writeAndFlush(publish2, promise2);
    channel.writeAndFlush(publish3, promise3);
    channel.writeAndFlush(publish4, promise4);

    promise1.await();

    assertEquals(3, orderedTopicHandler.queue.size());
    channel.pipeline().fireChannelRead(new PUBACK(1));

    promise2.await();
    assertEquals(2, orderedTopicHandler.queue.size());

    channel.pipeline().fireChannelRead(new PUBACK(2));
    promise3.await();

    channel.pipeline().fireChannelRead(new PUBACK(3));
    promise4.await();


    assertTrue(orderedTopicHandler.queue.isEmpty());
}
 
 类所在包
 同包方法