下面列出了怎么用io.netty.channel.ChannelPromise的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof RpcData) {
RpcData data = (RpcData) msg;
String connId = getConnId(ctx);
String key = getKey(connId);
boolean enc = false;
if( key != null && isRequest(data.getMeta()) ) {
ReflectionUtils.updateEncrypt(data.getMeta(),1);
enc = true;
}
int size = enc ? codec.getSize(data) + 16 : codec.getSize(data);
ByteBuf out = ctx.alloc().buffer(size);
codec.encode(data, out, key);
ctx.writeAndFlush(out, promise);
} else {
super.write(ctx, msg, promise);
}
}
@Test
public void test_inflight_messages() throws NoMessageIdAvailableException {
when(messageIDPool.takeIfAvailable(1)).thenReturn(1);
when(messageIDPool.takeIfAvailable(2)).thenReturn(2);
when(clientQueuePersistence.readInflight(eq("client"), anyLong(), anyInt()))
.thenReturn(Futures.immediateFuture(ImmutableList.of(createPublish(1), new PUBREL(2))));
when(channel.isActive()).thenReturn(true);
when(channel.newPromise()).thenReturn(mock(ChannelPromise.class));
when(channel.attr(ChannelAttributes.IN_FLIGHT_MESSAGES)).thenReturn(new TestChannelAttribute<>(new AtomicInteger(0)));
publishPollService.pollInflightMessages("client", channel);
verify(messageIDPool, times(2)).takeIfAvailable(anyInt());
verify(pipeline, times(1)).fireUserEventTriggered(any(PUBLISH.class));
verify(channelInactiveHandler, times(1)).addCallback(anyString(), any(ChannelInactiveHandler.ChannelInactiveCallback.class));
verify(channel).writeAndFlush(any(PubrelWithFuture.class));
}
/**
* Handler for commands sent from the stream.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof SendGrpcFrameCommand) {
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
} else if (msg instanceof SendResponseHeadersCommand) {
sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
} else if (msg instanceof CancelServerStreamCommand) {
cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else {
AssertionError e =
new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
ReferenceCountUtil.release(msg);
promise.setFailure(e);
throw e;
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
RequestState state = ctx.channel().attr(ATTR_REQ_STATE).get();
if (msg instanceof HttpResponse) {
state.response = (HttpResponse) msg;
state.responseBodySize = 0;
}
if (msg instanceof HttpContent) {
state.responseBodySize += ((HttpContent) msg).content().readableBytes();
}
super.write(ctx, msg, promise);
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
/**
* The write() method sends packets to the client
* It needs to be overrode in order to listen for outgoing packets
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof PacketStatusOutServerInfo) {
PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
this.event = new PingEvent(reply);
for(PingListener listener : PingAPI.getListeners()) {
listener.onPing(event);
}
if(!this.event.isCancelled()) {
super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
}
return;
}
else if(msg instanceof PacketStatusOutPong) {
if(this.event != null && this.event.isPongCancelled()) {
return;
}
}
super.write(ctx, msg, promise);
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
@Override
public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
try {
javaChannel().bindAddress(localAddress);
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
bindAddress(localAddress, promise);
}
});
}
return promise;
}
/**
* The write() method sends packets to the client
* It needs to be overrode in order to listen for outgoing packets
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof PacketStatusOutServerInfo) {
PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
this.event = new PingEvent(reply);
for(PingListener listener : PingAPI.getListeners()) {
listener.onPing(event);
}
if(!this.event.isCancelled()) {
super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
}
return;
}
else if(msg instanceof PacketStatusOutPong) {
if(this.event != null && this.event.isPongCancelled()) {
return;
}
}
super.write(ctx, msg, promise);
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
* splice until the {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
* {@link IllegalArgumentException} is thrown. </li>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
* target {@link AbstractEpollStreamChannel}</li>
* </ul>
*
*/
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
final ChannelPromise promise) {
if (ch.eventLoop() != eventLoop()) {
throw new IllegalArgumentException("EventLoops are not the same.");
}
if (len < 0) {
throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
}
if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
|| config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
}
checkNotNull(promise, "promise");
if (!isOpen()) {
promise.tryFailure(SPLICE_TO_CLOSED_CHANNEL_EXCEPTION);
} else {
addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
failSpliceIfClosed(promise);
}
return promise;
}
private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status)
throws Exception {
ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5});
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.method()).isEqualTo(HttpMethod.PUT);
assertThat(request.headers().get(HttpHeaders.CONNECTION))
.isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString());
HttpChunkedInput content = ch.readOutbound();
assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
assertThat(writePromise.isDone()).isTrue();
assertThat(ch.isOpen()).isTrue();
}
/**
* The write() method sends packets to the client
* It needs to be overrode in order to listen for outgoing packets
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof PacketStatusOutServerInfo) {
PacketStatusOutServerInfo packet = (PacketStatusOutServerInfo) msg;
PingReply reply = ServerInfoPacketHandler.constructReply(packet, ctx);
this.event = new PingEvent(reply);
for(PingListener listener : PingAPI.getListeners()) {
listener.onPing(event);
}
if(!this.event.isCancelled()) {
super.write(ctx, ServerInfoPacketHandler.constructPacket(reply), promise);
}
return;
}
else if(msg instanceof PacketStatusOutPong) {
if(this.event != null && this.event.isPongCancelled()) {
return;
}
}
super.write(ctx, msg, promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
LOG.trace("NettyServerHandler: Channel write: {}", msg);
if (isWebSocketServer() && msg instanceof ByteBuf) {
if (isFragmentWrites()) {
ByteBuf orig = (ByteBuf) msg;
int origIndex = orig.readerIndex();
int split = orig.readableBytes()/2;
ByteBuf part1 = orig.copy(origIndex, split);
LOG.trace("NettyServerHandler: Part1: {}", part1);
orig.readerIndex(origIndex + split);
LOG.trace("NettyServerHandler: Part2: {}", orig);
BinaryWebSocketFrame frame1 = new BinaryWebSocketFrame(false, 0, part1);
ctx.writeAndFlush(frame1);
ContinuationWebSocketFrame frame2 = new ContinuationWebSocketFrame(true, 0, orig);
ctx.write(frame2, promise);
} else {
BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
ctx.write(frame, promise);
}
} else {
ctx.write(msg, promise);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
try {
ctx.write(encode(ctx, (Packet) msg), promise);
// LOG.info("Write: OK");
} catch (Throwable e) {
// LOG.info("Write: " + e);
// https://github.com/netty/netty/issues/3060 - exception not reported by pipeline.
promise.tryFailure(e);
} finally {
// It isn't, but it might become so?
ReferenceCountUtil.release(msg);
}
}
private void infoHeadersAndTrailersWithData(boolean eos, int infoHeaderCount) {
writeAllFlowControlledFrames();
final int streamId = 6;
Http2Headers infoHeaders = informationalHeaders();
for (int i = 0; i < infoHeaderCount; ++i) {
encoder.writeHeaders(ctx, streamId, infoHeaders, 0, false, newPromise());
}
Http2Stream stream = connection.stream(streamId);
when(remoteFlow.hasFlowControlled(eq(stream))).thenReturn(true);
ChannelPromise promise2 = newPromise();
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise2);
ChannelPromise promise3 = newPromise();
ChannelFuture future = encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, eos, promise3);
assertTrue(future.isDone());
assertEquals(eos, future.isSuccess());
verify(writer, times(infoHeaderCount)).writeHeaders(eq(ctx), eq(streamId), eq(infoHeaders), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), any(ChannelPromise.class));
verify(writer, times(1)).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise2));
if (eos) {
verify(writer, times(1)).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise3));
}
}
@Override
public void disconnect(ChannelHandlerContext ctx,
ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DISCONNECT()"));
}
super.disconnect(ctx, promise);
}
private void wsWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if(msg instanceof String){
ctx.write(new TextWebSocketFrame((String) msg), promise);
return;
}
if (msg instanceof byte[]) {
ctx.write(new BinaryWebSocketFrame(Unpooled.wrappedBuffer((byte[]) msg)), promise);
return;
}
ctx.write(msg, promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
if (!future.isSuccess()) {
logger.error("write data to client fail ", future.cause());
}
}
});
super.write(ctx, msg, promise);
}
@Override
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) {
if (!(message instanceof HttpRequest)) {
context.write(message, promise);
return;
}
final HttpRequest request = (HttpRequest)message;
final Tracer tracer = GlobalTracer.get();
final SpanBuilder builder = tracer
.buildSpan(request.method().name())
.withTag(Tags.COMPONENT, "netty")
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CLIENT)
.withTag(Tags.HTTP_METHOD, request.method().name())
.withTag(Tags.HTTP_URL, request.uri());
final SpanContext parentContext = tracer.extract(Builtin.HTTP_HEADERS, new NettyExtractAdapter(request.headers()));
if (parentContext != null)
builder.asChildOf(parentContext);
final Span span = builder.start();
try (final Scope scope = tracer.activateSpan(span)) {
// AWS calls are often signed, so we can't add headers without breaking
// the signature.
if (!request.headers().contains("amz-sdk-invocation-id")) {
tracer.inject(span.context(), Builtin.HTTP_HEADERS, new NettyInjectAdapter(request.headers()));
}
context.channel().attr(TracingClientChannelInboundHandlerAdapter.CLIENT_ATTRIBUTE_KEY).set(span);
try {
context.write(message, promise);
}
catch (final Throwable t) {
OpenTracingApiUtil.setErrorTag(span, t);
span.finish();
throw t;
}
}
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
originalPromise = promise;
ChannelPromise downPromise = ctx.newPromise();
downPromise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !originalPromise.isDone()) {
originalPromise.setFailure(future.cause());
}
}
});
ctx.connect(remoteAddress, localAddress, downPromise);
}
@Override
public void deregister(ChannelPromise promise) {
if (!promise.setUncancellable()) {
return;
}
if (registered) {
registered = true;
promise.setSuccess();
pipeline().fireChannelUnregistered();
} else {
promise.setFailure(new IllegalStateException("Not registered"));
}
}
@Test
public void settingsWriteShouldNotUpdateSettings() throws Exception {
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(100);
settings.maxConcurrentStreams(1000);
settings.headerTableSize(2000);
ChannelPromise promise = newPromise();
encoder.writeSettings(ctx, settings, promise);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
}
final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// For H2 we don't need to notify protocolPayloadEndOutboundSuccess(ctx); the codecs takes care of half-closure
closeHandler.protocolPayloadEndOutbound(ctx);
HttpHeaders h1Headers = (HttpHeaders) msg;
Http2Headers h2Headers = h1HeadersToH2Headers(h1Headers);
if (h2Headers.isEmpty()) {
ctx.write(new DefaultHttp2DataFrame(EMPTY_BUFFER, true), promise);
} else {
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, true), promise);
}
}
@Override
public final ChannelFuture close(ChannelPromise promise) {
try {
channel().close(promise);
} catch (Exception e) {
promise.setFailure(e);
handleException(e);
}
return promise;
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
@Test
public void cancelTwiceDifferentReasons() throws Exception {
createStream();
cancelStream(Status.DEADLINE_EXCEEDED);
verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()),
any(ChannelPromise.class));
ChannelFuture future = cancelStream(Status.CANCELLED);
assertTrue(future.isSuccess());
}
@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
if (!isReady()) {
throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
}
if (!validRequest(request)) {
throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
}
int xid = getCurrentId();
try {
request.setId(xid);
channel.writeAndFlush(request);
ChannelPromise promise = channel.newPromise();
TokenClientPromiseHolder.putPromise(xid, promise);
if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
}
SimpleEntry<ChannelPromise, ClusterResponse> entry = TokenClientPromiseHolder.getEntry(xid);
if (entry == null || entry.getValue() == null) {
// Should not go through here.
throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
}
return entry.getValue();
} finally {
TokenClientPromiseHolder.remove(xid);
}
}
@Override
public void write(ChannelHandlerContext context, Object message, ChannelPromise promise)
{
if (message instanceof ThriftFrame) {
// always re-enable auto read
context.channel().config().setAutoRead(true);
}
context.write(message, promise);
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
boolean lastContent = false;
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame responseHeaders = (Http2HeadersFrame) msg;
final Http2Headers headers = responseHeaders.headers();
lastContent = responseHeaders.isEndStream();
accessLog.status(headers.status())
.chunked(true);
}
if (msg instanceof Http2DataFrame) {
final Http2DataFrame data = (Http2DataFrame) msg;
lastContent = data.isEndStream();
accessLog.increaseContentLength(data.content().readableBytes());
}
if (lastContent) {
ctx.write(msg, promise.unvoid())
.addListener(future -> {
if (future.isSuccess()) {
accessLog.log();
}
});
return;
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
@Test(timeout = 4_000)
public void test_remove_messages() throws Exception {
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE = 1;
final PUBLISH publish1 = createPublish("topic", 1, QoS.AT_LEAST_ONCE);
final PUBLISH publish2 = createPublish("topic", 2, QoS.AT_LEAST_ONCE);
final PUBLISH publish3 = createPublish("topic", 3, QoS.AT_LEAST_ONCE);
final PUBLISH publish4 = createPublish("topic", 4, QoS.AT_LEAST_ONCE);
final ChannelPromise promise1 = channel.newPromise();
final ChannelPromise promise2 = channel.newPromise();
final ChannelPromise promise3 = channel.newPromise();
final ChannelPromise promise4 = channel.newPromise();
channel.writeAndFlush(publish1, promise1);
channel.writeAndFlush(publish2, promise2);
channel.writeAndFlush(publish3, promise3);
channel.writeAndFlush(publish4, promise4);
promise1.await();
assertEquals(3, orderedTopicHandler.queue.size());
channel.pipeline().fireChannelRead(new PUBACK(1));
promise2.await();
assertEquals(2, orderedTopicHandler.queue.size());
channel.pipeline().fireChannelRead(new PUBACK(2));
promise3.await();
channel.pipeline().fireChannelRead(new PUBACK(3));
promise4.await();
assertTrue(orderedTopicHandler.queue.isEmpty());
}