下面列出了 io.netty.handler.codec.http2.DefaultHttp2Connection #io.netty.handler.codec.http2.Http2Stream 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
// close() already called by NettyClientTransport, so just need to clean up streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
promise.setSuccess();
}
/**
* Handler for a GOAWAY being received. Fails any streams created after the
* last known stream.
*/
private void goingAway(Status status) {
lifecycleManager.notifyShutdown(status);
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
if (stream.id() > lastKnownStream) {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(
goAwayStatus, RpcProgress.REFUSED, false, new Metadata());
}
stream.close();
}
return true;
}
});
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void windowShouldNotExceedMaxWindowSize() throws Exception {
manualSetUp();
makeStream();
AbstractNettyHandler handler = (AbstractNettyHandler) handler();
handler.setAutoTuneFlowControl(true);
Http2Stream connectionStream = connection().connectionStream();
Http2LocalFlowController localFlowController = connection().local().flowController();
int maxWindow = handler.flowControlPing().maxWindow();
handler.flowControlPing().setDataSizeSincePing(maxWindow);
long payload = handler.flowControlPing().payload();
channelRead(pingFrame(true, payload));
assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream));
}
@Override
public void write(Http2Stream stream, int numBytes) {
if (numBytes > 0) {
// Add the data to the refresher so that it can be given back to the
// stream at the end of the iteration.
DataRefresher refresher = dataRefresher(stream);
refresher.add(numBytes);
++counters.numWrites;
counters.totalBytes += numBytes;
if (numBytes < counters.minWriteSize) {
counters.minWriteSize = numBytes;
}
if (numBytes > counters.maxWriteSize) {
counters.maxWriteSize = numBytes;
}
}
delegate.write(stream, numBytes);
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) {
int processed = data.readableBytes() + padding;
Http2Stream http2Stream = connection().stream(streamId);
ByteBuf msg = http2Stream.getProperty(messageKey);
if (msg == null) {
msg = ctx.alloc().buffer();
http2Stream.setProperty(messageKey, msg);
}
final int dataReadableBytes = data.readableBytes();
msg.writeBytes(data, data.readerIndex(), dataReadableBytes);
if (endOfStream) {
// read cached http2 header from stream
Http2Headers headers = http2Stream.getProperty(headerKey);
handleRequest(ctx, streamId, headers, msg);
}
return processed;
}
/**
* By default, connection window size is a constant value:
* connectionWindowSize = 65535 + (configureInitialWindowSize - 65535) * 2.
* See https://github.com/netty/netty/blob/5c458c9a98d4d3d0345e58495e017175156d624f/codec-http2/src/main/java/io/netty
* /handler/codec/http2/Http2FrameCodec.java#L255
* We should expand connection window so that the window size proportional to the number of concurrent streams within the
* connection.
* Note that when {@code WINDOW_UPDATE} will be sent depends on the processedWindow in DefaultHttp2LocalFlowController.
*/
private void tryExpandConnectionWindow(Channel parentChannel) {
doInEventLoop(parentChannel.eventLoop(), () -> {
Http2Connection http2Connection = parentChannel.attr(HTTP2_CONNECTION).get();
Integer initialWindowSize = parentChannel.attr(HTTP2_INITIAL_WINDOW_SIZE).get();
Validate.notNull(http2Connection, "http2Connection should not be null on channel " + parentChannel);
Validate.notNull(http2Connection, "initialWindowSize should not be null on channel " + parentChannel);
Http2Stream connectionStream = http2Connection.connectionStream();
log.debug(() -> "Expanding connection window size for " + parentChannel + " by " + initialWindowSize);
try {
Http2LocalFlowController localFlowController = http2Connection.local().flowController();
localFlowController.incrementWindowSize(connectionStream, initialWindowSize);
} catch (Http2Exception e) {
log.warn(() -> "Failed to increment windowSize of connection " + parentChannel, e);
}
});
}
private void writeErrorResponse(ChannelHandlerContext ctx, int streamId, HttpResponseStatus status,
@Nullable ByteBuf content) throws Http2Exception {
final ByteBuf data =
content != null ? content
: Unpooled.wrappedBuffer(status.toString().getBytes(StandardCharsets.UTF_8));
writer.writeHeaders(
ctx, streamId,
new DefaultHttp2Headers(false)
.status(status.codeAsText())
.set(HttpHeaderNames.CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8.toString())
.setInt(HttpHeaderNames.CONTENT_LENGTH, data.readableBytes()),
0, false, ctx.voidPromise());
writer.writeData(ctx, streamId, data, 0, true, ctx.voidPromise());
final Http2Stream stream = writer.connection().stream(streamId);
if (stream != null && writer.flowController().hasFlowControlled(stream)) {
// Ensure to flush the error response if it's flow-controlled so that it is sent
// before an RST_STREAM frame.
writer.flowController().writePendingBytes();
}
}
/**
* Returns {@code true} if the stream with the given {@code streamId} has been created and is writable.
* Note that this method will return {@code false} for the stream which was not created yet.
*/
protected final boolean isStreamPresentAndWritable(int streamId) {
final Http2Stream stream = encoder.connection().stream(streamId);
if (stream == null) {
return false;
}
switch (stream.state()) {
case RESERVED_LOCAL:
case OPEN:
case HALF_CLOSED_REMOTE:
return true;
default:
// The response has been sent already.
return false;
}
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
super.close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag());
PerfMark.linkIn(msg.getLink());
try {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} finally {
PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
}
}
stream.close();
return true;
}
});
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
// close() already called by NettyClientTransport, so just need to clean up streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyClientHandler.forcefulClose", tag);
PerfMark.linkIn(msg.getLink());
try {
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
} finally {
PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
}
}
});
promise.setSuccess();
}
@Test
public void windowShouldNotExceedMaxWindowSize() throws Exception {
manualSetUp();
makeStream();
AbstractNettyHandler handler = (AbstractNettyHandler) handler();
handler.setAutoTuneFlowControl(true);
Http2Stream connectionStream = connection().connectionStream();
Http2LocalFlowController localFlowController = connection().local().flowController();
int maxWindow = handler.flowControlPing().maxWindow();
handler.flowControlPing().setDataSizeAndSincePing(maxWindow);
long payload = handler.flowControlPing().payload();
channelRead(pingFrame(true, payload));
assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream));
}
/**
* Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the
* context of the transport thread.
*/
public void setHttp2Stream(Http2Stream http2Stream) {
checkNotNull(http2Stream, "http2Stream");
checkState(this.http2Stream == null, "Can only set http2Stream once");
this.http2Stream = http2Stream;
// Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate.
onStreamAllocated();
getTransportTracer().reportLocalStreamStarted();
}
public TransportState(
NettyServerHandler handler,
EventLoop eventLoop,
Http2Stream http2Stream,
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
this.http2Stream = checkNotNull(http2Stream, "http2Stream");
this.handler = checkNotNull(handler, "handler");
this.eventLoop = eventLoop;
}
/**
* Handler for the Channel shutting down.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
}
if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination();
}
if (maxConnectionAgeMonitor != null) {
maxConnectionAgeMonitor.cancel(false);
}
final Status status =
Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
// Any streams that are still active must be closed
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
serverStream.transportReportStatus(status);
}
return true;
}
});
} finally {
super.channelInactive(ctx);
}
}
/**
* Returns the given processed bytes back to inbound flow control.
*/
void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
try {
decoder().flowController().consumeBytes(http2Stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
}
private Http2Stream requireHttp2Stream(int streamId) {
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
// This should never happen.
throw new AssertionError("Stream does not exist: " + streamId);
}
return stream;
}
/**
* Returns the given processed bytes back to inbound flow control.
*/
void returnProcessedBytes(Http2Stream stream, int bytes) {
try {
decoder().flowController().consumeBytes(stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
}
/**
* Handler for the Channel shutting down.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
logger.fine("Network channel is closed");
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
lifecycleManager.notifyShutdown(status);
try {
cancelPing(lifecycleManager.getShutdownThrowable());
// Report status to the application layer for any open streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(
lifecycleManager.getShutdownStatus(), false, new Metadata());
}
return true;
}
});
} finally {
lifecycleManager.notifyTerminated(status);
}
} finally {
// Close any open streams
super.channelInactive(ctx);
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
}
}
}
private Http2Stream requireHttp2Stream(int streamId) {
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
// This should never happen.
throw new AssertionError("Stream does not exist: " + streamId);
}
return stream;
}
/**
* Sends initial connection window to the remote endpoint if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1;
ctx.flush();
}
}
@Test
public void connectionWindowShouldBeOverridden() throws Exception {
flowControlWindow = 1048576; // 1MiB
manualSetUp();
Http2Stream connectionStream = connection().connectionStream();
Http2LocalFlowController localFlowController = connection().local().flowController();
int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream);
int actualWindowSize = localFlowController.windowSize(connectionStream);
assertEquals(flowControlWindow, actualWindowSize);
assertEquals(flowControlWindow, actualInitialWindowSize);
}
@Test
public void connectionWindowShouldBeOverridden() throws Exception {
flowControlWindow = 1048576; // 1MiB
setUp();
Http2Stream connectionStream = connection().connectionStream();
Http2LocalFlowController localFlowController = connection().local().flowController();
int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream);
int actualWindowSize = localFlowController.windowSize(connectionStream);
assertEquals(flowControlWindow, actualWindowSize);
assertEquals(flowControlWindow, actualInitialWindowSize);
assertEquals(1048576, actualWindowSize);
}
@Override
protected NettyClientHandler newHandler() throws Http2Exception {
Http2Connection connection = new DefaultHttp2Connection(false);
// Create and close a stream previous to the nextStreamId.
Http2Stream stream = connection.local().createStream(streamId - 2, true);
stream.close();
final Ticker ticker = new Ticker() {
@Override
public long read() {
return nanoTime;
}
};
Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted(ticker);
}
};
return NettyClientHandler.newHandler(
connection,
frameReader(),
frameWriter(),
lifecycleManager,
mockKeepAliveManager,
flowControlWindow,
maxHeaderListSize,
stopwatchSupplier,
tooManyPingsRunnable,
transportTracer,
Attributes.EMPTY,
"someauthority");
}
@Test
public void windowUpdateMatchesTarget() throws Exception {
manualSetUp();
Http2Stream connectionStream = connection().connectionStream();
Http2LocalFlowController localFlowController = connection().local().flowController();
makeStream();
AbstractNettyHandler handler = (AbstractNettyHandler) handler();
handler.setAutoTuneFlowControl(true);
ByteBuf data = ctx().alloc().buffer(1024);
while (data.isWritable()) {
data.writeLong(1111);
}
int length = data.readableBytes();
ByteBuf frame = dataFrame(3, false, data.copy());
channelRead(frame);
int accumulator = length;
// 40 is arbitrary, any number large enough to trigger a window update would work
for (int i = 0; i < 40; i++) {
channelRead(dataFrame(3, false, data.copy()));
accumulator += length;
}
long pingData = handler.flowControlPing().payload();
channelRead(pingFrame(true, pingData));
assertEquals(accumulator, handler.flowControlPing().getDataSincePing());
assertEquals(2 * accumulator, localFlowController.initialWindowSize(connectionStream));
}
@Override
public void addFlowControlled(Http2Stream stream, FlowControlled payload) {
// Don't check size beforehand because Headers payload returns 0 all the time.
do {
payload.write(ctx, MAX_INITIAL_WINDOW_SIZE);
} while (payload.size() > 0);
}
@Setup(Level.Trial)
public void setupTrial() throws Exception {
connection = new DefaultHttp2Connection(false);
dataRefresherKey = connection.newKey();
// Create the flow controller
switch (algorithm) {
case WFQ:
distributor = new WeightedFairQueueByteDistributor(connection, 0);
break;
case UNIFORM:
distributor = new UniformStreamByteDistributor(connection);
break;
}
controller = new DefaultHttp2RemoteFlowController(connection, new ByteCounter(distributor));
connection.remote().flowController(controller);
Http2ConnectionHandler handler = new Http2ConnectionHandlerBuilder()
.encoderEnforceMaxConcurrentStreams(false).validateHeaders(false)
.frameListener(new Http2FrameAdapter())
.connection(connection)
.build();
ctx = new EmbeddedChannelWriteReleaseHandlerContext(PooledByteBufAllocator.DEFAULT, handler) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
handler.handlerAdded(ctx);
handler.channelActive(ctx);
// Create the streams, each initialized with MAX_INT bytes.
for (int i = 0; i < numStreams; ++i) {
Http2Stream stream = connection.local().createStream(toStreamId(i), false);
addData(stream, Integer.MAX_VALUE);
stream.setProperty(dataRefresherKey, new DataRefresher(stream));
}
}
/** Removes all requests and handlers for a given stream. */
@Override
public void onStreamRemoved(Http2Stream stream) {
int id = stream.id();
requests.remove(id);
handlers.remove(id);
}
@Override
public void onStreamClosed(Http2Stream stream) {
goAwayHandler.onStreamClosed(channel(), stream);
final HttpResponseWrapper res = getResponse(streamIdToId(stream.id()), true);
if (res == null) {
return;
}
if (!goAwayHandler.receivedGoAway()) {
res.close(ClosedStreamException.get());
return;
}
final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (stream.id() > lastStreamId) {
res.close(UnprocessedRequestException.of(GoAwayReceivedException.get()));
} else {
res.close(ClosedStreamException.get());
}
// Send a GOAWAY frame if the connection has been scheduled for disconnection and
// it did not receive or send a GOAWAY frame.
if (needsToDisconnectNow() && !goAwayHandler.sentGoAway() && !goAwayHandler.receivedGoAway()) {
channel().close();
}
}
@Override
public void onStreamClosed(Http2Stream stream) {
goAwayHandler.onStreamClosed(channel, stream);
final DecodedHttpRequest req = requests.remove(stream.id());
if (req != null) {
// Ignored if the stream has already been closed.
req.close(ClosedStreamException.get());
}
}