下面列出了怎么用io.netty.util.concurrent.PromiseCombiner的API类实例代码及写法,或者点击链接到github查看源代码。
private static void encodeChunkedContent(ChannelHandlerContext ctx, Buffer msg, long contentLength,
PromiseCombiner promiseCombiner) {
if (contentLength > 0) {
String lengthHex = toHexString(contentLength);
ByteBuf buf = ctx.alloc().directBuffer(lengthHex.length() + 2);
buf.writeCharSequence(lengthHex, US_ASCII);
writeShortBE(buf, CRLF_SHORT);
promiseCombiner.add(ctx.write(buf));
promiseCombiner.add(ctx.write(encodeAndRetain(msg)));
promiseCombiner.add(ctx.write(CRLF_BUF.duplicate()));
} else {
assert contentLength == 0;
// Need to produce some output otherwise an
// IllegalStateException will be thrown
promiseCombiner.add(ctx.write(encodeAndRetain(msg)));
}
}
private Future<?> doClose() {
EventLoop closeEventLoop = eventLoopGroup.next();
Promise<?> closeFinishedPromise = closeEventLoop.newPromise();
doInEventLoop(closeEventLoop, () -> {
Promise<Void> releaseAllChannelsPromise = closeEventLoop.newPromise();
PromiseCombiner promiseCombiner = new PromiseCombiner(closeEventLoop);
// Create a copy of the connections to remove while we close them, in case closing updates the original list.
List<MultiplexedChannelRecord> channelsToRemove = new ArrayList<>(connections);
for (MultiplexedChannelRecord channel : channelsToRemove) {
promiseCombiner.add(closeAndReleaseParent(channel.getConnection()));
}
promiseCombiner.finish(releaseAllChannelsPromise);
releaseAllChannelsPromise.addListener(f -> {
connectionPool.close();
closeFinishedPromise.setSuccess(null);
});
});
return closeFinishedPromise;
}
@Override
public void sendAsync(List<Append> appends, CompletedCallback callback) {
Channel ch;
try {
checkClientConnectionClosed();
ch = nettyHandler.getChannel();
} catch (ConnectionFailedException e) {
callback.complete(new ConnectionFailedException("Connection to " + connectionName + " is not established."));
return;
}
PromiseCombiner combiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
for (Append append : appends) {
combiner.add(ch.write(append));
}
ch.flush();
ChannelPromise promise = ch.newPromise();
promise.addListener(future -> {
nettyHandler.setRecentMessage();
Throwable cause = future.cause();
callback.complete(cause == null ? null : new ConnectionFailedException(cause));
});
combiner.finish(promise);
}
private void writeContent(ChannelHandlerContext ctx, SegmentedData data, ChannelPromise promise) {
Headers trailingHeaders = data.trailingHeaders();
boolean hasTrailing = trailingHeaders != null && trailingHeaders.size() > 0;
boolean dataEos = data.endOfMessage() && !hasTrailing;
int streamId = data.streamId();
Http2Request request =
Http2Request.build(streamId, new DefaultHttp2DataFrame(data.content(), dataEos), dataEos);
if (hasTrailing) {
Http2Headers headers = trailingHeaders.http2Headers();
Http2Request last = Http2Request.build(streamId, headers, true);
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(request, ctx.newPromise()));
combiner.add(ctx.write(last, ctx.newPromise()));
combiner.finish(promise);
} else {
ctx.write(request, promise);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof Message) {
Message message = (Message) msg;
Request request = message.getRequest();
if (request.expectsResponse()) {
setRequest(ctx.channel().attr(KEY).get(), request);
}
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(write(ctx, message.getPayload()));
combiner.add(write(ctx, message));
combiner.finish(promise);
} else {
throw new RuntimeException("Only Message objects can be written to ClientCodec");
}
}
private Future<?> doClose() {
EventLoop closeEventLoop = eventLoopGroup.next();
Promise<?> closeFinishedPromise = closeEventLoop.newPromise();
NettyUtils.doInEventLoop(closeEventLoop, () -> {
Promise<Void> releaseAllChannelsPromise = closeEventLoop.newPromise();
PromiseCombiner promiseCombiner = new PromiseCombiner(closeEventLoop);
// Create a copy of the connections to remove while we close them, in case closing updates the original list.
List<MultiplexedChannelRecord> channelsToRemove = new ArrayList<>(parentConnections);
for (MultiplexedChannelRecord channel : channelsToRemove) {
promiseCombiner.add(closeAndReleaseParent(channel.getParentChannel()));
}
promiseCombiner.finish(releaseAllChannelsPromise);
releaseAllChannelsPromise.addListener(f -> {
parentConnectionPool.close();
closeFinishedPromise.setSuccess(null);
});
});
return closeFinishedPromise;
}
private void writeRequest(ChannelHandlerContext ctx, Request request, ChannelPromise promise) {
/*
// TOOD(CK): define ACCEPT?
if (!response.headers().contains(HttpHeaderNames.CONTENT_TYPE)) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
*/
Http2Headers headers = request.headers().http2Headers();
headers.authority(request.host()).method(request.method().asciiName()).path(request.path());
int streamId = request.streamId();
if (request instanceof FullRequest) {
if (request.body().readableBytes() > 0) {
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(Http2Request.build(streamId, headers, false), ctx.newPromise()));
Http2DataFrame data = new DefaultHttp2DataFrame(request.body(), true);
combiner.add(ctx.write(Http2Request.build(streamId, data, true), ctx.newPromise()));
combiner.finish(promise);
} else {
ctx.write(Http2Request.build(streamId, headers, true), promise);
}
} else {
ctx.write(Http2Request.build(streamId, headers, false), promise);
}
}
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream = connection().stream(streamId);
final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
if (channel == null) {
// The compressor may be null if no compatible encoding type was found in this stream's headers
return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
}
try {
// The channel will release the buffer after being written
channel.writeOutbound(data);
ByteBuf buf = nextReadableBuf(channel);
if (buf == null) {
if (endOfStream) {
if (channel.finish()) {
buf = nextReadableBuf(channel);
}
return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
true, promise);
}
// END_STREAM is not set and the assumption is data is still forthcoming.
promise.setSuccess();
return promise;
}
PromiseCombiner combiner = new PromiseCombiner();
for (;;) {
ByteBuf nextBuf = nextReadableBuf(channel);
boolean compressedEndOfStream = nextBuf == null && endOfStream;
if (compressedEndOfStream && channel.finish()) {
nextBuf = nextReadableBuf(channel);
compressedEndOfStream = nextBuf == null;
}
ChannelPromise bufPromise = ctx.newPromise();
combiner.add(bufPromise);
super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
if (nextBuf == null) {
break;
}
padding = 0; // Padding is only communicated once on the first iteration
buf = nextBuf;
}
combiner.finish(promise);
} catch (Throwable cause) {
promise.tryFailure(cause);
} finally {
if (endOfStream) {
cleanup(stream, channel);
}
}
return promise;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpMetaData) {
if (state != ST_INIT) {
throw new IllegalStateException("unexpected message type: " + simpleClassName(msg));
}
T metaData = castMetaData(msg);
// We prefer a direct allocation here because it is expected the resulted encoded Buffer will be written
// to a socket. In order to do the write to the socket the memory typically needs to be allocated in direct
// memory and will be copied to direct memory if not. Using a direct buffer will avoid the copy.
ByteBuf byteBuf = ctx.alloc().directBuffer((int) headersEncodedSizeAccumulator);
Buffer stBuf = newBufferFrom(byteBuf);
// Encode the message.
encodeInitialLine(stBuf, metaData);
state = isContentAlwaysEmpty(metaData) ? ST_CONTENT_ALWAYS_EMPTY :
isTransferEncodingChunked(metaData.headers()) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;
sanitizeHeadersBeforeEncode(metaData, state == ST_CONTENT_ALWAYS_EMPTY);
encodeHeaders(metaData.headers(), byteBuf, stBuf);
writeShortBE(byteBuf, CRLF_SHORT);
closeHandler.protocolPayloadBeginOutbound(ctx);
if (shouldClose(metaData)) {
closeHandler.protocolClosingOutbound(ctx);
}
headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(byteBuf.readableBytes()) +
HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
ctx.write(byteBuf, promise);
} else if (msg instanceof Buffer) {
final Buffer stBuffer = (Buffer) msg;
if (stBuffer.readableBytes() == 0) {
// Bypass the encoder in case of an empty buffer, so that the following idiom works:
//
// ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//
// See https://github.com/netty/netty/issues/2983 for more information.
// We can directly write EMPTY_BUFFER here because there is no need to worry about the buffer being
// already released.
ctx.write(EMPTY_BUFFER, promise);
} else {
switch (state) {
case ST_INIT:
throw new IllegalStateException("unexpected message type: " + simpleClassName(msg));
case ST_CONTENT_NON_CHUNK:
final long contentLength = calculateContentLength(stBuffer);
if (contentLength > 0) {
ctx.write(encodeAndRetain(stBuffer), promise);
break;
}
// fall-through!
case ST_CONTENT_ALWAYS_EMPTY:
// Need to produce some output otherwise an IllegalStateException will be thrown as we did
// not write anything Its ok to just write an EMPTY_BUFFER as if there are reference count
// issues these will be propagated as the caller of the encodeAndRetain(...) method will
// release the original buffer. Writing an empty buffer will not actually write anything on
// the wire, so if there is a user error with msg it will not be visible externally
ctx.write(EMPTY_BUFFER, promise);
break;
case ST_CONTENT_CHUNK:
PromiseCombiner promiseCombiner = new PromiseCombiner();
encodeChunkedContent(ctx, stBuffer, calculateContentLength(stBuffer), promiseCombiner);
promiseCombiner.finish(promise);
break;
default:
throw new Error();
}
}
} else if (msg instanceof HttpHeaders) {
closeHandler.protocolPayloadEndOutbound(ctx);
promise.addListener(f -> {
if (f.isSuccess()) {
// Only writes of the last payload that have been successfully written and flushed should emit
// events. A CloseHandler should not get into a completed state on a failed write so that it can
// abort and close the Channel when appropriate.
closeHandler.protocolPayloadEndOutboundSuccess(ctx);
}
});
final int oldState = state;
state = ST_INIT;
if (oldState == ST_CONTENT_CHUNK) {
encodeAndWriteTrailers(ctx, (HttpHeaders) msg, promise);
} else {
ctx.write(EMPTY_BUFFER, promise);
}
}
}
/**
* Note this blocks until all the channels have finished closing.
*/
public void gracefullyShutdownClientChannels()
{
LOG.warn("Gracefully shutting down all client channels");
try {
// Mark all active connections to be closed after next response sent.
LOG.warn("Flagging CLOSE_AFTER_RESPONSE on " + channels.size() + " client channels.");
// Pick some arbitrary executor.
PromiseCombiner closeAfterPromises = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
for (Channel channel : channels)
{
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);
ChannelPromise closePromise = channel.pipeline().newPromise();
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
// TODO(carl-mastrangelo): remove closePromise, since I don't think it's needed. Need to verify.
closeAfterPromises.add(channel.closeFuture());
}
// Wait for all of the attempts to close connections gracefully, or max of 30 secs each.
Promise<Void> combinedCloseAfterPromise = executor.newPromise();
closeAfterPromises.finish(combinedCloseAfterPromise);
combinedCloseAfterPromise.await(30, TimeUnit.SECONDS);
// Close all of the remaining active connections.
LOG.warn("Closing remaining active client channels.");
List<ChannelFuture> forceCloseFutures = new ArrayList<>();
channels.forEach(channel -> {
if (channel.isActive()) {
ChannelFuture f = channel.pipeline().close();
forceCloseFutures.add(f);
}
});
LOG.warn("Waiting for " + forceCloseFutures.size() + " client channels to be closed.");
PromiseCombiner closePromisesCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
closePromisesCombiner.addAll(forceCloseFutures.toArray(new ChannelFuture[0]));
Promise<Void> combinedClosePromise = executor.newPromise();
closePromisesCombiner.finish(combinedClosePromise);
combinedClosePromise.await(5, TimeUnit.SECONDS);
LOG.warn(forceCloseFutures.size() + " client channels closed.");
}
catch (InterruptedException ie) {
LOG.warn("Interrupted while shutting down client channels");
}
}