下面列出了 io.netty.handler.codec.http2.Http2ServerUpgradeCodec #io.netty.handler.codec.http2.Http2CodecUtil 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void manualSetUp() throws Exception {
assertNull("manualSetUp should not run more than once", handler());
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
// replace the keepAliveManager with spyKeepAliveManager
spyKeepAliveManager =
mock(KeepAliveManager.class, delegatesTo(handler().getKeepAliveManagerForTest()));
handler().setKeepAliveManagerForTest(spyKeepAliveManager);
// Simulate receipt of the connection preface
handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null);
channelRead(Http2CodecUtil.connectionPrefaceBuf());
// Simulate receipt of initial remote settings.
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
channelRead(serializedSettings);
}
@Test
public void transportTracer_windowUpdate_local() throws Exception {
manualSetUp();
TransportStats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(flowControlWindow, before.localFlowControlWindow);
// If the window size is below a certain threshold, netty will wait to apply the update.
// Use a large increment to be sure that it exceeds the threshold.
connection().local().flowController().incrementWindowSize(
connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
TransportStats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
connection().local().flowController().windowSize(connection().connectionStream()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (needsToFilterUpgradeResponse && msg instanceof HttpResponse) {
needsToFilterUpgradeResponse = false;
final HttpResponse res = (HttpResponse) msg;
if (res.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code()) {
final HttpHeaders headers = res.headers();
if (!headers.contains(HttpHeaderNames.UPGRADE)) {
headers.set(HttpHeaderNames.UPGRADE,
Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME);
}
}
if (!needsToFilterUpgradeRequest) {
ctx.pipeline().remove(this);
}
}
ctx.fireChannelRead(msg);
}
private Http2Settings http2Settings() {
final Http2Settings settings = new Http2Settings();
final int initialWindowSize = config.http2InitialStreamWindowSize();
if (initialWindowSize != Http2CodecUtil.DEFAULT_WINDOW_SIZE) {
settings.initialWindowSize(initialWindowSize);
}
final int maxFrameSize = config.http2MaxFrameSize();
if (maxFrameSize != Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE) {
settings.maxFrameSize(maxFrameSize);
}
// Not using the value greater than 2^31-1 because some HTTP/2 client implementations use a signed
// 32-bit integer to represent an HTTP/2 SETTINGS parameter value.
settings.maxConcurrentStreams(Math.min(config.http2MaxStreamsPerConnection(), Integer.MAX_VALUE));
settings.maxHeaderListSize(config.http2MaxHeaderListSize());
return settings;
}
private void writeHeaders(
ChannelHandlerContext ctx,
Http2Headers headers,
boolean eos,
ChannelPromise promise,
int currentStreamId)
throws Exception {
encoder()
.writeHeaders(
ctx,
currentStreamId,
headers,
0,
Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT,
false,
0,
eos,
promise);
}
@Override
protected void manualSetUp() throws Exception {
assertNull("manualSetUp should not run more than once", handler());
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
// replace the keepAliveManager with spyKeepAliveManager
spyKeepAliveManager =
mock(KeepAliveManager.class, delegatesTo(handler().getKeepAliveManagerForTest()));
handler().setKeepAliveManagerForTest(spyKeepAliveManager);
// Simulate receipt of the connection preface
handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null);
channelRead(Http2CodecUtil.connectionPrefaceBuf());
// Simulate receipt of initial remote settings.
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
channelRead(serializedSettings);
}
@Test
public void transportTracer_windowUpdate_local() throws Exception {
manualSetUp();
TransportStats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(flowControlWindow, before.localFlowControlWindow);
// If the window size is below a certain threshold, netty will wait to apply the update.
// Use a large increment to be sure that it exceeds the threshold.
connection().local().flowController().incrementWindowSize(
connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
TransportStats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
connection().local().flowController().windowSize(connection().connectionStream()));
}
@Test
public void transportReadyDelayedUntilConnectionPreface() throws Exception {
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null);
verify(transportListener, never()).transportReady(any(Attributes.class));
// Simulate receipt of the connection preface
channelRead(Http2CodecUtil.connectionPrefaceBuf());
channelRead(serializeSettings(new Http2Settings()));
verify(transportListener).transportReady(any(Attributes.class));
}
@Test
public void transportTracer_windowSizeDefault() throws Exception {
manualSetUp();
TransportStats transportStats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, transportStats.remoteFlowControlWindow);
assertEquals(flowControlWindow, transportStats.localFlowControlWindow);
}
@Test
public void transportTracer_windowSize() throws Exception {
flowControlWindow = 1024 * 1024;
manualSetUp();
TransportStats transportStats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, transportStats.remoteFlowControlWindow);
assertEquals(flowControlWindow, transportStats.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_remote() throws Exception {
manualSetUp();
TransportStats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow);
ByteBuf serializedSettings = windowUpdate(0, 1000);
channelRead(serializedSettings);
TransportStats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
after.remoteFlowControlWindow);
assertEquals(flowControlWindow, after.localFlowControlWindow);
}
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(
Http2MultiplexCodecBuilder.forServer(new HelloWorldHttp2Handler()).build());
} else {
return null;
}
}
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(new HelloWorldHttp2HandlerBuilder().build());
} else {
return null;
}
}
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(
Http2FrameCodecBuilder.forServer().build(), new HelloWorldHttp2Handler());
} else {
return null;
}
}
@Override
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(new HelloWorldHttp2HandlerBuilder().build());
} else {
return null;
}
}
@Override
@Nullable
public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
return new Http2ServerUpgradeCodec(http2FrameCodec, new Http2MultiplexHandler(this));
}
else {
return null;
}
}
@Test
public void headerTableSize() {
builder.headerTableSize(123);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isEqualTo(123);
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.pushEnabled()).isNull();
}
@Test
public void initialWindowSize() {
builder.initialWindowSize(123);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isEqualTo(123);
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.pushEnabled()).isNull();
}
@Test
public void maxConcurrentStreams() {
builder.maxConcurrentStreams(123);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isEqualTo(123);
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.pushEnabled()).isNull();
}
@Test
public void maxFrameSize() {
builder.maxFrameSize(16384);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isEqualTo(16384);
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.pushEnabled()).isNull();
}
@Test
public void pushEnabled() {
builder.pushEnabled(true);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.pushEnabled()).isTrue();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < Http2CodecUtil.FRAME_HEADER_LENGTH) {
return;
}
handledResponse = true;
final ChannelPipeline p = ctx.pipeline();
if (!isSettingsFrame(in)) { // The first frame must be a settings frame.
// Http2ConnectionHandler sent the connection preface, but the server responded with
// something else, which means the server does not support HTTP/2.
SessionProtocolNegotiationCache.setUnsupported(remoteAddress(ctx), H2C);
if (httpPreference == HttpPreference.HTTP2_REQUIRED) {
finishWithNegotiationFailure(
ctx, H2C, H1C, "received a non-HTTP/2 response for the HTTP/2 connection preface");
} else {
// We can silently retry with H1C.
retryWithH1C(ctx);
}
// We are going to close the connection really soon, so we don't need the response.
in.skipBytes(in.readableBytes());
} else {
// The server responded with a non-HTTP/1 response. Continue treating the connection as HTTP/2.
finishSuccessfully(p, H2C);
}
p.remove(this);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof UpgradeEvent) {
// Generate the initial Http2Settings frame,
// so that the next handler knows the protocol upgrade occurred as well.
ctx.fireChannelRead(DEFAULT_HTTP2_SETTINGS);
// Continue handling the upgrade request after the upgrade is complete.
final FullHttpRequest nettyReq = ((UpgradeEvent) evt).upgradeRequest();
// Remove the headers related with the upgrade.
nettyReq.headers().remove(HttpHeaderNames.CONNECTION);
nettyReq.headers().remove(HttpHeaderNames.UPGRADE);
nettyReq.headers().remove(Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER);
if (logger.isDebugEnabled()) {
logger.debug("{} Handling the pre-upgrade request ({}): {} {} {} ({}B)",
ctx.channel(), ((UpgradeEvent) evt).protocol(),
nettyReq.method(), nettyReq.uri(), nettyReq.protocolVersion(),
nettyReq.content().readableBytes());
}
channelRead(ctx, nettyReq);
channelReadComplete(ctx);
return;
}
ctx.fireUserEventTriggered(evt);
}
private static ClientFactory newClientFactory() {
return ClientFactory.builder()
.useHttp2Preface(true)
// Set the window size to the HTTP/2 default values to simplify the traffic.
.http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.workerGroup(eventLoop.get(), false)
.build();
}
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
// Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
stream.transportHeadersReceived(headers, endStream);
}
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
@Test
public void transportReadyDelayedUntilConnectionPreface() throws Exception {
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null);
verify(transportListener, never()).transportReady(any(Attributes.class));
// Simulate receipt of the connection preface
channelRead(Http2CodecUtil.connectionPrefaceBuf());
channelRead(serializeSettings(new Http2Settings()));
verify(transportListener).transportReady(any(Attributes.class));
}
@Test
public void transportTracer_windowSizeDefault() throws Exception {
manualSetUp();
TransportStats transportStats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, transportStats.remoteFlowControlWindow);
assertEquals(flowControlWindow, transportStats.localFlowControlWindow);
}
@Test
public void transportTracer_windowSize() throws Exception {
flowControlWindow = 1024 * 1024;
manualSetUp();
TransportStats transportStats = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, transportStats.remoteFlowControlWindow);
assertEquals(flowControlWindow, transportStats.localFlowControlWindow);
}
@Test
public void transportTracer_windowUpdate_remote() throws Exception {
manualSetUp();
TransportStats before = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow);
ByteBuf serializedSettings = windowUpdate(0, 1000);
channelRead(serializedSettings);
TransportStats after = transportTracer.getStats();
assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
after.remoteFlowControlWindow);
assertEquals(flowControlWindow, after.localFlowControlWindow);
}
@Test
public void maxFrameSize() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory =
ClientFactory.builder()
.useHttp2Preface(true)
// Set the window size to the HTTP/2 default values to simplify the traffic.
.http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.http2MaxFrameSize(DEFAULT_MAX_FRAME_SIZE * 2) // == 16384 * 2
.build()) {
final int port = ss.getLocalPort();
final WebClient client = WebClient.builder("http://127.0.0.1:" + port)
.factory(clientFactory)
.build();
client.get("/").aggregate();
try (Socket s = ss.accept()) {
final InputStream in = s.getInputStream();
final BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream());
readBytes(in, connectionPrefaceBuf().capacity()); // Read the connection preface and discard it.
// Read a SETTINGS frame and validate it.
assertSettingsFrameOfMaxFrameSize(in);
sendEmptySettingsAndAckFrame(bos);
readBytes(in, 9); // Read a SETTINGS_ACK frame and discard it.
readHeadersFrame(in); // Read a HEADERS frame and discard it.
sendHeaderFrame(bos);
////////////////////////////////////////
// Transmission of data gets started. //
////////////////////////////////////////
// Send a DATA frame that indicates sending data as much as 0x8000 for stream id 03.
bos.write(new byte[] { 0x00, (byte) 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03 });
bos.write(EMPTY_DATA);
bos.write(EMPTY_DATA);
bos.flush();
readBytes(in, 13); // Read a WINDOW_UPDATE frame for connection and discard it.
readBytes(in, 13); // Read a WINDOW_UPDATE frame for stream id 03 and discard it.
// Send a DATA frame that exceed MAX_FRAME_SIZE by 1.
bos.write(new byte[] { 0x00, (byte) 0x80, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03 });
bos.flush(); // Triggers the client to send a GOAWAY frame for the connection.
// The client send a GOAWAY frame and the server read it.
final ByteBuf buffer = readGoAwayFrame(in);
final DefaultHttp2FrameReader frameReader = new DefaultHttp2FrameReader();
final CountDownLatch latch = new CountDownLatch(1);
frameReader.readFrame(null, buffer, new Http2EventAdapter() {
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData)
throws Http2Exception {
assertThat(lastStreamId).isZero(); // 0: connection error
assertThat(errorCode).isEqualTo(Http2Error.FRAME_SIZE_ERROR.code());
latch.countDown();
}
});
latch.await();
buffer.release();
// Client should disconnect after receiving a GOAWAY frame.
assertThat(in.read()).isEqualTo(-1);
}
}
}