下面列出了怎么用 io.netty.handler.codec.http2.Http2SettingsFrame 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2SettingsFrame) {
if (ackSettings(ctx, (Http2SettingsFrame) msg)) {
ctx.writeAndFlush(Http2SettingsAckFrame.INSTANCE);
}
} else if (msg instanceof Http2GoAwayFrame) {
Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg;
goAwayFrame.release();
parentContext.onClosing.onComplete();
// We trigger the graceful close process here (with no timeout) to make sure the socket is closed once
// the existing streams are closed. The MultiplexCodec may simulate a GOAWAY when the stream IDs are
// exhausted so we shouldn't rely upon our peer to close the transport.
parentContext.keepAliveManager.initiateGracefulClose(parentContext.onClosing::onComplete);
} else if (msg instanceof Http2PingFrame) {
parentContext.keepAliveManager.pingReceived((Http2PingFrame) msg);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
boolean ackSettings(final ChannelHandlerContext ctx, final Http2SettingsFrame settingsFrame) {
final Long maxConcurrentStreams = settingsFrame.settings().maxConcurrentStreams();
if (maxConcurrentStreams == null) {
return true;
}
maxConcurrencyProcessor.onNext(new MaxConcurrencyConsumableEvent(
maxConcurrentStreams.intValue(), ctx.channel()));
return false;
}
@Test
public void channelRead_useServerMaxStreams() {
long serverMaxStreams = 50L;
Http2SettingsFrame http2SettingsFrame = http2SettingsFrame(serverMaxStreams);
handler.channelRead0(context, http2SettingsFrame);
assertThat(channel.attr(MAX_CONCURRENT_STREAMS).get()).isEqualTo(serverMaxStreams);
assertThat(protocolCompletableFuture).isDone();
assertThat(protocolCompletableFuture.join()).isEqualTo(Protocol.HTTP2);
}
@Test
public void channelRead_useClientMaxStreams() {
long serverMaxStreams = 10000L;
Http2SettingsFrame http2SettingsFrame = http2SettingsFrame(serverMaxStreams);
handler.channelRead0(context, http2SettingsFrame);
assertThat(channel.attr(MAX_CONCURRENT_STREAMS).get()).isEqualTo(clientMaxStreams);
assertThat(protocolCompletableFuture).isDone();
assertThat(protocolCompletableFuture.join()).isEqualTo(Protocol.HTTP2);
}
private Http2SettingsFrame http2SettingsFrame(long serverMaxStreams) {
return new Http2SettingsFrame() {
@Override
public Http2Settings settings() {
Http2Settings http2Settings = new Http2Settings();
http2Settings.maxConcurrentStreams(serverMaxStreams);
return http2Settings;
}
@Override
public String name() {
return "test";
}
};
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2SettingsFrame) {
sendNewState(Connection.from(ctx.channel()), ConnectionObserver.State.CONFIGURED);
ctx.pipeline().remove(NettyPipeline.ReactiveBridge);
ctx.pipeline().remove(this);
return;
}
ctx.fireChannelRead(msg);
}
@Override
boolean ackSettings(final ChannelHandlerContext ctx, final Http2SettingsFrame settingsFrame) {
// server side doesn't asynchronously need to ACK the settings because there is no need to coordinate
// the maximum concurrent streams value with the application.
return false;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) {
Long serverMaxStreams = Optional.ofNullable(msg.settings().maxConcurrentStreams()).orElse(Long.MAX_VALUE);
channel.attr(MAX_CONCURRENT_STREAMS).set(Math.min(clientMaxStreams, serverMaxStreams));
channel.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP2);
}
@Test
public void execute_noExplicitValueSet_sendsDefaultValueInSettings() throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(DEFAULT_INIT_WINDOW_SIZE);
}
}
private void expectCorrectWindowSizeValueTest(Integer builderSetterValue, int settingsFrameValue) throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(builderSetterValue)
.build())
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(settingsFrameValue);
}
}
abstract boolean ackSettings(ChannelHandlerContext ctx, Http2SettingsFrame settingsFrame);