下面列出了怎么用 io.netty.handler.codec.http2.Http2Frame 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Http2Frame)) {
ctx.fireChannelRead(msg);
return;
}
Http2Frame frame = (Http2Frame) msg;
receivedFrames.add(frame);
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
if (dataFrame.isEndStream()) {
Http2HeadersFrame respHeaders = new DefaultHttp2HeadersFrame(
new DefaultHttp2Headers().status("204"), true)
.stream(dataFrame.stream());
ctx.writeAndFlush(respHeaders);
}
}
ReferenceCountUtil.release(frame);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && failOnFirstChannel) {
ctx.channel().parent().close();
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && notRespondOnFirstChannel) {
LOGGER.info(() -> "This is the first request, not responding" + ctx.channel());
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws Exception {
ReferenceCountUtil.retain(frame);
RequestInfo requestInfo = ctx.channel().attr(Http2NetworkClient.REQUEST_INFO).get();
requestInfo.responseFramesCount++;
long time = System.currentTimeMillis() - requestInfo.getStreamSendTime();
if (frame instanceof Http2HeadersFrame) {
http2ClientMetrics.http2StreamRoundTripTime.update(time);
requestInfo.setStreamHeaderFrameReceiveTime(System.currentTimeMillis());
logger.debug("Header Frame received. Time from send: {}ms. Request: {}", time, requestInfo);
} else if (frame instanceof Http2DataFrame) {
logger.debug("Data Frame size: {}. Time from send: {}ms. Request: {}",
((Http2DataFrame) frame).content().readableBytes(), time, requestInfo);
}
if (frame instanceof Http2DataFrame && ((Http2DataFrame) frame).isEndStream()) {
http2ClientMetrics.http2StreamFirstToLastFrameTime.update(time);
http2ClientMetrics.http2ResponseFrameCount.update(requestInfo.responseFramesCount);
logger.debug("All Frame received. Time from send: {}ms. Request: {}", time, requestInfo);
}
ctx.fireChannelRead(frame);
}
protected static void incrementCounter(Registry registry, String counterName, String metricId, Http2Frame frame)
{
long errorCode;
if (frame instanceof Http2ResetFrame) {
errorCode = ((Http2ResetFrame) frame).errorCode();
}
else if (frame instanceof Http2GoAwayFrame) {
errorCode = ((Http2GoAwayFrame) frame).errorCode();
}
else {
errorCode = -1;
}
registry.counter(counterName,
"id", metricId,
"frame", frame.name(),
"error_code", Long.toString(errorCode))
.increment();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws Exception {
if (frame instanceof Http2DataFrame) {
onDataRead((Http2DataFrame) frame, ctx);
} else if (frame instanceof Http2HeadersFrame) {
onHeadersRead((Http2HeadersFrame) frame, ctx);
ctx.channel().read();
} else if (frame instanceof Http2ResetFrame) {
onRstStreamRead((Http2ResetFrame) frame, ctx);
} else {
// TODO this is related to the inbound window update bug. Revisit
ctx.channel().parent().read();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Http2Frame)) {
ctx.fireChannelRead(msg);
return;
}
Http2Frame frame = (Http2Frame) msg;
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
ReferenceCountUtil.release(frame);
if (dataFrame.isEndStream()) {
Http2HeadersFrame respHeaders = new DefaultHttp2HeadersFrame(
new DefaultHttp2Headers().status("204"), true)
.stream(dataFrame.stream());
ctx.writeAndFlush(respHeaders);
}
if (sleeps > 0) {
--sleeps;
// Simulate a server that's slow to read data. Since our
// window size is equal to the max frame size, the client
// shouldn't be able to send more data until we update our
// window
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
}
}
ctx.writeAndFlush(new DefaultHttp2WindowUpdateFrame(dataFrame.initialFlowControlledBytes())
.stream(dataFrame.stream()));
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame http2Frame) throws Exception {
if (http2Frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) http2Frame;
if (dataFrame.isEndStream()) {
Http2Headers headers = new DefaultHttp2Headers().status("200");
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, false));
ctx.executor().scheduleAtFixedRate(() -> {
DefaultHttp2DataFrame respData = new DefaultHttp2DataFrame(Unpooled.wrappedBuffer("hello".getBytes()), false);
ctx.writeAndFlush(respData);
}, 0, 2, TimeUnit.SECONDS);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try {
if (msg instanceof Http2Frame) {
incrementCounter(registry, frameCounterName, metricId, (Http2Frame) msg);
}
}
finally {
super.channelRead(ctx, msg);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
super.write(ctx, msg, promise);
if (msg instanceof Http2Frame) {
incrementCounter(registry, frameCounterName, metricId, (Http2Frame) msg);
}
}
@Test
public void execute_noExplicitValueSet_sendsDefaultValueInSettings() throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(DEFAULT_INIT_WINDOW_SIZE);
}
}
private void expectCorrectWindowSizeValueTest(Integer builderSetterValue, int settingsFrameValue) throws InterruptedException {
ConcurrentLinkedQueue<Http2Frame> receivedFrames = new ConcurrentLinkedQueue<>();
server = new TestH2Server(() -> new StreamHandler(receivedFrames));
server.init();
netty = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(builderSetterValue)
.build())
.build();
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.requestContentPublisher(new EmptyPublisher())
.request(SdkHttpFullRequest.builder()
.method(SdkHttpMethod.GET)
.protocol("http")
.host("localhost")
.port(server.port())
.build())
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {
}
@Override
public void onStream(Publisher<ByteBuffer> stream) {
}
@Override
public void onError(Throwable error) {
}
})
.build();
netty.execute(req).join();
List<Http2Settings> receivedSettings = receivedFrames.stream()
.filter(f -> f instanceof Http2SettingsFrame)
.map(f -> (Http2SettingsFrame) f)
.map(Http2SettingsFrame::settings)
.collect(Collectors.toList());
assertThat(receivedSettings.size()).isGreaterThan(0);
for (Http2Settings s : receivedSettings) {
assertThat(s.initialWindowSize()).isEqualTo(settingsFrameValue);
}
}
private StreamHandler(Queue<Http2Frame> receivedFrames) {
this.receivedFrames = receivedFrames;
}