下面列出了 io.netty.handler.codec.http2.DefaultHttp2HeadersFrame #io.netty.handler.codec.http2.Http2DataFrame 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
final void readDataFrame(ChannelHandlerContext ctx, Object msg) {
Object toRelease = msg;
try {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
if (dataFrame.content().isReadable()) {
// Copy to unpooled memory before passing to the user
Buffer data = allocator.newBuffer(dataFrame.content().readableBytes());
ByteBuf nettyData = toByteBuf(data);
nettyData.writeBytes(dataFrame.content());
toRelease = release(dataFrame);
ctx.fireChannelRead(data);
} else {
toRelease = release(dataFrame);
}
if (dataFrame.isEndStream()) {
ctx.fireChannelRead(headersFactory.newEmptyTrailers());
}
} finally {
if (toRelease != null) {
ReferenceCountUtil.release(toRelease);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Http2Frame)) {
ctx.fireChannelRead(msg);
return;
}
Http2Frame frame = (Http2Frame) msg;
receivedFrames.add(frame);
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
if (dataFrame.isEndStream()) {
Http2HeadersFrame respHeaders = new DefaultHttp2HeadersFrame(
new DefaultHttp2Headers().status("204"), true)
.stream(dataFrame.stream());
ctx.writeAndFlush(respHeaders);
}
}
ReferenceCountUtil.release(frame);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && failOnFirstChannel) {
ctx.channel().parent().close();
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && notRespondOnFirstChannel) {
LOGGER.info(() -> "This is the first request, not responding" + ctx.channel());
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws Exception {
ReferenceCountUtil.retain(frame);
RequestInfo requestInfo = ctx.channel().attr(Http2NetworkClient.REQUEST_INFO).get();
requestInfo.responseFramesCount++;
long time = System.currentTimeMillis() - requestInfo.getStreamSendTime();
if (frame instanceof Http2HeadersFrame) {
http2ClientMetrics.http2StreamRoundTripTime.update(time);
requestInfo.setStreamHeaderFrameReceiveTime(System.currentTimeMillis());
logger.debug("Header Frame received. Time from send: {}ms. Request: {}", time, requestInfo);
} else if (frame instanceof Http2DataFrame) {
logger.debug("Data Frame size: {}. Time from send: {}ms. Request: {}",
((Http2DataFrame) frame).content().readableBytes(), time, requestInfo);
}
if (frame instanceof Http2DataFrame && ((Http2DataFrame) frame).isEndStream()) {
http2ClientMetrics.http2StreamFirstToLastFrameTime.update(time);
http2ClientMetrics.http2ResponseFrameCount.update(requestInfo.responseFramesCount);
logger.debug("All Frame received. Time from send: {}ms. Request: {}", time, requestInfo);
}
ctx.fireChannelRead(frame);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else {
super.channelRead(ctx, msg);
}
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
private static void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
if (data.isEndStream()) {
sendResponse(ctx, data.content());
} else {
// We do not send back the response to the remote-peer, so we need to release it.
data.release();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else {
super.channelRead(ctx, msg);
}
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
private static void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
Http2FrameStream stream = data.stream();
if (data.isEndStream()) {
sendResponse(ctx, stream, data.content());
} else {
// We do not send back the response to the remote-peer, so we need to release it.
data.release();
}
// Update the flowcontroller
ctx.write(new DefaultHttp2WindowUpdateFrame(data.initialFlowControlledBytes()).stream(stream));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else {
super.channelRead(ctx, msg);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws Exception {
if (frame instanceof Http2DataFrame) {
onDataRead((Http2DataFrame) frame, ctx);
} else if (frame instanceof Http2HeadersFrame) {
onHeadersRead((Http2HeadersFrame) frame, ctx);
ctx.channel().read();
} else if (frame instanceof Http2ResetFrame) {
onRstStreamRead((Http2ResetFrame) frame, ctx);
} else {
// TODO this is related to the inbound window update bug. Revisit
ctx.channel().parent().read();
}
}
private void onDataRead(Http2DataFrame dataFrame, ChannelHandlerContext ctx) throws Http2Exception {
ByteBuf data = dataFrame.content();
data.retain();
if (!dataFrame.isEndStream()) {
ctx.fireChannelRead(new DefaultHttpContent(data));
} else {
ctx.fireChannelRead(new DefaultLastHttpContent(data));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Http2Frame)) {
ctx.fireChannelRead(msg);
return;
}
Http2Frame frame = (Http2Frame) msg;
if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
ReferenceCountUtil.release(frame);
if (dataFrame.isEndStream()) {
Http2HeadersFrame respHeaders = new DefaultHttp2HeadersFrame(
new DefaultHttp2Headers().status("204"), true)
.stream(dataFrame.stream());
ctx.writeAndFlush(respHeaders);
}
if (sleeps > 0) {
--sleeps;
// Simulate a server that's slow to read data. Since our
// window size is equal to the max frame size, the client
// shouldn't be able to send more data until we update our
// window
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
}
}
ctx.writeAndFlush(new DefaultHttp2WindowUpdateFrame(dataFrame.initialFlowControlledBytes())
.stream(dataFrame.stream()));
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame http2Frame) throws Exception {
if (http2Frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) http2Frame;
if (dataFrame.isEndStream()) {
Http2Headers headers = new DefaultHttp2Headers().status("200");
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, false));
ctx.executor().scheduleAtFixedRate(() -> {
DefaultHttp2DataFrame respData = new DefaultHttp2DataFrame(Unpooled.wrappedBuffer("hello".getBytes()), false);
ctx.writeAndFlush(respData);
}, 0, 2, TimeUnit.SECONDS);
}
}
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
boolean lastContent = false;
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame responseHeaders = (Http2HeadersFrame) msg;
final Http2Headers headers = responseHeaders.headers();
lastContent = responseHeaders.isEndStream();
accessLog.status(headers.status())
.chunked(true);
}
if (msg instanceof Http2DataFrame) {
final Http2DataFrame data = (Http2DataFrame) msg;
lastContent = data.isEndStream();
accessLog.increaseContentLength(data.content().readableBytes());
}
if (lastContent) {
ctx.write(msg, promise.unvoid())
.addListener(future -> {
if (future.isSuccess()) {
accessLog.log();
}
});
return;
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
private Response wrapResponse(ChannelHandlerContext ctx, Http2Response msg) {
log.debug("wrapResponse msg={}", msg);
final Response response;
Http2MessageSession session = Http2MessageSession.lazyCreateSession(ctx);
int streamId =
Http2ClientStreamMapper.http2ClientStreamMapper(ctx).inboundStreamId(msg.streamId, msg.eos);
if (msg.payload instanceof Http2Headers) {
Http2Headers headers = (Http2Headers) msg.payload;
if (msg.eos && headers.method() == null && headers.status() == null) {
response =
session
.currentResponse(msg.streamId)
.map(
resp ->
session.onInboundResponse(
new SegmentedResponseData(
resp, new Http2SegmentedData(headers, streamId))))
.orElse(null);
} else {
response = wrapHeaders(headers, streamId, msg.eos);
session.onInboundResponse(response);
}
} else if (msg.payload instanceof Http2DataFrame) {
Http2DataFrame frame = (Http2DataFrame) msg.payload;
response =
session
.currentResponse(streamId)
.map(
resp ->
session.onInboundResponse(
new SegmentedResponseData(
resp, new Http2SegmentedData(frame.content(), msg.eos, streamId))))
.orElse(null);
} else {
// TODO(CK): throw an exception?
response = null;
}
return response;
}
private void writeRequest(ChannelHandlerContext ctx, Request request, ChannelPromise promise) {
/*
// TOOD(CK): define ACCEPT?
if (!response.headers().contains(HttpHeaderNames.CONTENT_TYPE)) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
*/
Http2Headers headers = request.headers().http2Headers();
headers.authority(request.host()).method(request.method().asciiName()).path(request.path());
int streamId = request.streamId();
if (request instanceof FullRequest) {
if (request.body().readableBytes() > 0) {
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(Http2Request.build(streamId, headers, false), ctx.newPromise()));
Http2DataFrame data = new DefaultHttp2DataFrame(request.body(), true);
combiner.add(ctx.write(Http2Request.build(streamId, data, true), ctx.newPromise()));
combiner.finish(promise);
} else {
ctx.write(Http2Request.build(streamId, headers, true), promise);
}
} else {
ctx.write(Http2Request.build(streamId, headers, false), promise);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
// not an h2 frame, forward the write
if (!(msg instanceof Http2DataFrame
|| msg instanceof Http2Headers
|| msg instanceof Http2Request
|| msg instanceof Http2Response)) {
ctx.write(msg, promise);
return;
}
if (msg instanceof Http2Response) {
Http2Response response = (Http2Response) msg;
if (response.payload instanceof Http2Headers) {
writeHeaders(
ctx, (Http2Headers) response.payload, response.eos, promise, response.streamId);
return;
}
if (response.payload instanceof Http2DataFrame) {
writeData(ctx, (Http2DataFrame) response.payload, promise, response.streamId);
return;
}
}
}
@Test
public void testFullResponseWithBody() throws Exception {
outputReceived = new CountDownLatch(2);
ByteBuf body = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "response");
Http2Headers headersIn = new DefaultHttp2Headers().method("GET").path("/");
Http2Request requestIn = Http2Request.build(1, headersIn, true);
FullResponse responseIn = ResponseBuilders.newOk().streamId(1).body(body).build();
channel.writeInbound(requestIn);
channel.runPendingTasks(); // blocks
channel.writeOutbound(responseIn);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
Http2Response responseOut = responses.remove(0);
assertNotNull(responseOut);
assertTrue(responseOut.payload instanceof Http2Headers);
assertEquals("200", ((Http2Headers) responseOut.payload).status().toString());
assertFalse(responseOut.eos);
assertEquals(1, responseOut.streamId);
Http2Response bodyOut1 = responses.remove(0);
assertNotNull(bodyOut1);
assertTrue(bodyOut1.payload instanceof Http2DataFrame);
assertEquals(body, ((Http2DataFrame) bodyOut1.payload).content());
assertTrue(bodyOut1.eos);
assertEquals(1, bodyOut1.streamId);
}
@Test
public void testFullRequestWithBody() throws Exception {
outputReceived = new CountDownLatch(1);
ByteBuf body = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body");
FullRequest requestIn = RequestBuilders.newPost("/").host("localhost").body(body).build();
channel.writeOutbound(requestIn);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
Http2Request requestOut = (Http2Request) requests.remove(0);
assertTrue(requestOut != null);
assertTrue(requestOut.payload instanceof Http2Headers);
assertEquals("POST", ((Http2Headers) requestOut.payload).method().toString());
assertEquals("/", ((Http2Headers) requestOut.payload).path());
assertFalse(requestOut.eos);
Http2Request contentOut = (Http2Request) requests.remove(0);
assertTrue(contentOut != null);
assertTrue(contentOut.payload instanceof Http2DataFrame);
assertEquals(body, ((Http2DataFrame) contentOut.payload).content());
assertTrue(contentOut.eos);
}
@Override
public void handle(Http2Request request, ChannelHandlerContext ctx) {
if (request.payload instanceof Http2Headers) {
handleHeaders(ctx, request);
} else if (request.payload instanceof Http2DataFrame) {
Http2DataFrame data = (Http2DataFrame) request.payload;
ReferenceCountUtil.retain(data);
handleData(ctx, request);
}
}
private boolean isEndOfRequestResponse(Object msg)
{
if (msg instanceof Http2HeadersFrame) {
return ((Http2HeadersFrame) msg).isEndStream();
}
if (msg instanceof Http2DataFrame) {
return ((Http2DataFrame) msg).isEndStream();
}
return false;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
Http2Headers h2Headers = headersFrame.headers();
final HttpResponseStatus httpStatus;
if (!readHeaders) {
CharSequence status = h2Headers.getAndRemove(STATUS.value());
if (status == null) {
throw new IllegalArgumentException("a response must have " + STATUS + " header");
}
httpStatus = HttpResponseStatus.of(status);
if (httpStatus.statusClass().equals(INFORMATIONAL_1XX)) {
// We don't expose 1xx "interim responses" [2] to the user, and discard them to make way for the
// "real" response.
//
// for a response only, zero or more HEADERS frames (each followed
// by zero or more CONTINUATION frames) containing the message
// headers of informational (1xx) HTTP responses. [1]
// A client MUST be able to parse one or more 1xx responses received
// prior to a final response, even if the client does not expect one. A
// user agent MAY ignore unexpected 1xx responses. [2]
// 1xx responses are terminated by the first empty line after
// the status-line (the empty line signaling the end of the header
// section). [2]
// [1] https://tools.ietf.org/html/rfc7540#section-8.1
// [2] https://tools.ietf.org/html/rfc7231#section-6.2
return;
}
readHeaders = true;
} else {
httpStatus = null;
}
if (headersFrame.isEndStream()) {
if (httpStatus != null) {
fireFullResponse(ctx, h2Headers, httpStatus);
} else {
ctx.fireChannelRead(h2HeadersToH1HeadersClient(h2Headers, null));
}
} else if (httpStatus == null) {
throw new IllegalArgumentException("a response must have " + STATUS + " header");
} else {
StreamingHttpResponse response = newResponse(httpStatus, HTTP_2_0,
h2HeadersToH1HeadersClient(h2Headers, httpStatus), allocator, headersFactory);
ctx.fireChannelRead(response);
}
} else if (msg instanceof Http2DataFrame) {
readDataFrame(ctx, msg);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
Http2Headers h2Headers = headersFrame.headers();
final HttpRequestMethod httpMethod;
final String path;
if (!readHeaders) {
CharSequence method = h2Headers.getAndRemove(METHOD.value());
CharSequence pathSequence = h2Headers.getAndRemove(PATH.value());
if (pathSequence == null || method == null) {
throw new IllegalArgumentException("a request must have " + METHOD + " and " +
PATH + " headers");
}
path = pathSequence.toString();
httpMethod = sequenceToHttpRequestMethod(method);
readHeaders = true;
} else {
httpMethod = null;
path = null;
}
if (headersFrame.isEndStream()) {
if (httpMethod != null) {
fireFullRequest(ctx, h2Headers, httpMethod, path);
} else {
ctx.fireChannelRead(h2TrailersToH1TrailersServer(h2Headers));
}
} else if (httpMethod == null) {
throw new IllegalArgumentException("a request must have " + METHOD + " and " +
PATH + " headers");
} else {
StreamingHttpRequest request = newRequest(httpMethod, path, HTTP_2_0,
h2HeadersToH1HeadersServer(h2Headers, httpMethod), allocator, headersFactory);
ctx.fireChannelRead(request);
}
} else if (msg instanceof Http2DataFrame) {
readDataFrame(ctx, msg);
} else {
ctx.fireChannelRead(msg);
}
}
@Nullable
private static Http2DataFrame release(Http2DataFrame dataFrame) {
dataFrame.release();
return null;
}
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) {
ctx.write(new DefaultHttp2DataFrame(data.content().retainedDuplicate(), data.isEndStream()));
}
public static Http2Request<Http2DataFrame> build(int streamId, Http2DataFrame data, boolean eos) {
return new Http2Request<>(streamId, data, eos);
}
private void writeData(
ChannelHandlerContext ctx, Http2DataFrame data, ChannelPromise promise, int currentStreamId)
throws Exception {
encoder().writeData(ctx, currentStreamId, data.content(), 0, data.isEndStream(), promise);
}
public static Http2Response<Http2DataFrame> build(
int streamId, Http2DataFrame data, boolean eos) {
return new Http2Response<>(streamId, data, eos);
}
@Test
public void testStreamingResponse() throws Exception {
outputReceived = new CountDownLatch(3);
Http2Headers headersIn = new DefaultHttp2Headers().method("GET").path("/");
Http2Request requestIn = Http2Request.build(1, headersIn, true);
SegmentedResponse responseIn =
DefaultSegmentedResponse.builder()
.streamId(1)
.status(OK)
.headers(new DefaultHeaders())
.build();
ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1");
SegmentedData content =
DefaultSegmentedData.builder().streamId(1).content(body1).endOfMessage(false).build();
ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2");
SegmentedData lastContent =
DefaultSegmentedData.builder()
.content(body2)
.streamId(1)
.endOfMessage(true)
.trailingHeaders(new DefaultHeaders())
.build();
channel.writeInbound(requestIn);
channel.runPendingTasks(); // blocks
channel.writeOutbound(responseIn);
channel.writeOutbound(content);
channel.writeOutbound(lastContent);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
Http2Response responseOut = responses.remove(0);
assertNotNull(responseOut);
assertTrue(responseOut.payload instanceof Http2Headers);
assertEquals("200", ((Http2Headers) responseOut.payload).status().toString());
assertFalse(responseOut.eos);
assertEquals(1, responseOut.streamId);
Http2Response bodyOut1 = responses.remove(0);
assertNotNull(bodyOut1);
assertTrue(bodyOut1.payload instanceof Http2DataFrame);
assertEquals(body1, ((Http2DataFrame) bodyOut1.payload).content());
assertFalse(bodyOut1.eos);
assertEquals(1, bodyOut1.streamId);
Http2Response bodyOut2 = responses.remove(0);
assertNotNull(bodyOut2);
assertTrue(bodyOut2.payload instanceof Http2DataFrame);
assertEquals(body2, ((Http2DataFrame) bodyOut2.payload).content());
assertTrue(bodyOut2.eos);
assertEquals(1, bodyOut2.streamId);
}