下面列出了 io.netty.handler.codec.http2.DefaultHttp2HeadersFrame #io.netty.handler.codec.http2.DefaultHttp2DataFrame 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && failOnFirstChannel) {
ctx.channel().parent().close();
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) {
if (frame instanceof Http2DataFrame) {
// Not respond if this is channel 1
if (channelIds[0].equals(ctx.channel().parent().id().asShortText()) && notRespondOnFirstChannel) {
LOGGER.info(() -> "This is the first request, not responding" + ctx.channel());
} else {
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(false);
try {
LOGGER.info(() -> "return empty data " + ctx.channel() + " frame " + frame.getClass());
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(dataFrame);
ctx.write(new DefaultHttp2HeadersFrame(headers, true));
ctx.flush();
} finally {
dataFrame.release();
}
}
}
}
private void writeContent(ChannelHandlerContext ctx, SegmentedData data, ChannelPromise promise) {
Headers trailingHeaders = data.trailingHeaders();
boolean hasTrailing = trailingHeaders != null && trailingHeaders.size() > 0;
boolean dataEos = data.endOfMessage() && !hasTrailing;
int streamId = data.streamId();
Http2Request request =
Http2Request.build(streamId, new DefaultHttp2DataFrame(data.content(), dataEos), dataEos);
if (hasTrailing) {
Http2Headers headers = trailingHeaders.http2Headers();
Http2Request last = Http2Request.build(streamId, headers, true);
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(request, ctx.newPromise()));
combiner.add(ctx.write(last, ctx.newPromise()));
combiner.finish(promise);
} else {
ctx.write(request, promise);
}
}
@Override
public int onDataRead(
ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
if (isServer) {
ctx.fireChannelRead(
Http2Request.build(
streamId,
new DefaultHttp2DataFrame(data.retain(), endOfStream, padding),
endOfStream));
} else {
ctx.fireChannelRead(
Http2Response.build(
streamId,
new DefaultHttp2DataFrame(data.retain(), endOfStream, padding),
endOfStream));
}
return data.readableBytes() + padding;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame msgHeader = (Http2HeadersFrame) msg;
if (msgHeader.isEndStream()) {
ByteBuf content = ctx.alloc()
.buffer();
content.writeBytes(RESPONSE_BYTES.duplicate());
Http2Headers headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers).stream(msgHeader.stream()));
ctx.write(new DefaultHttp2DataFrame(content, true).stream(msgHeader.stream()));
}
} else {
super.channelRead(ctx, msg);
}
}
/**
* Sends a "Hello World" DATA frame to the client.
*/
private static void sendResponse(ChannelHandlerContext ctx, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers));
ctx.write(new DefaultHttp2DataFrame(payload, true));
}
/**
* Sends a "Hello World" DATA frame to the client.
*/
private static void sendResponse(ChannelHandlerContext ctx, Http2FrameStream stream, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream));
ctx.write(new DefaultHttp2DataFrame(payload, true).stream(stream));
}
final void writeBuffer(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = toByteBufNoThrow((Buffer) msg);
if (byteBuf == null) {
promise.setFailure(new IllegalArgumentException("unsupported Buffer type:" + msg));
ctx.close();
} else {
ctx.write(new DefaultHttp2DataFrame(byteBuf.retain(), false), promise);
}
}
final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// For H2 we don't need to notify protocolPayloadEndOutboundSuccess(ctx); the codecs takes care of half-closure
closeHandler.protocolPayloadEndOutbound(ctx);
HttpHeaders h1Headers = (HttpHeaders) msg;
Http2Headers h2Headers = h1HeadersToH2Headers(h1Headers);
if (h2Headers.isEmpty()) {
ctx.write(new DefaultHttp2DataFrame(EMPTY_BUFFER, true), promise);
} else {
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, true), promise);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame http2Frame) throws Exception {
if (http2Frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) http2Frame;
if (dataFrame.isEndStream()) {
Http2Headers headers = new DefaultHttp2Headers().status("200");
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, false));
ctx.executor().scheduleAtFixedRate(() -> {
DefaultHttp2DataFrame respData = new DefaultHttp2DataFrame(Unpooled.wrappedBuffer("hello".getBytes()), false);
ctx.writeAndFlush(respData);
}, 0, 2, TimeUnit.SECONDS);
}
}
}
private void writeRequest(ChannelHandlerContext ctx, Request request, ChannelPromise promise) {
/*
// TOOD(CK): define ACCEPT?
if (!response.headers().contains(HttpHeaderNames.CONTENT_TYPE)) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
*/
Http2Headers headers = request.headers().http2Headers();
headers.authority(request.host()).method(request.method().asciiName()).path(request.path());
int streamId = request.streamId();
if (request instanceof FullRequest) {
if (request.body().readableBytes() > 0) {
PromiseCombiner combiner = new PromiseCombiner();
combiner.add(ctx.write(Http2Request.build(streamId, headers, false), ctx.newPromise()));
Http2DataFrame data = new DefaultHttp2DataFrame(request.body(), true);
combiner.add(ctx.write(Http2Request.build(streamId, data, true), ctx.newPromise()));
combiner.finish(promise);
} else {
ctx.write(Http2Request.build(streamId, headers, true), promise);
}
} else {
ctx.write(Http2Request.build(streamId, headers, false), promise);
}
}
/**
* Handles conversion of {@link Send} to HTTP/2 frames.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!ctx.channel().isOpen()) {
logger.debug("Channel closed when write. Channel: {}", ctx.channel());
promise.setFailure(new ChannelException("Channel has been closed when write."));
}
if (!(msg instanceof Send)) {
ctx.write(msg, promise);
return;
}
Send send = (Send) msg;
Http2Headers http2Headers;
if (forServer) {
logger.trace("Write content to channel as server {}", ctx.channel());
http2Headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText());
} else {
logger.trace("Write content to channel as client {}", ctx.channel());
http2Headers = new DefaultHttp2Headers().method(HttpMethod.POST.asciiName()).scheme("https").path("/");
}
DefaultHttp2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false);
ctx.write(headersFrame);
// TODO: Use {@link RetainingAsyncWritableChannel} after writeTo(AsyncWritableChannel channel, Callback<Long> callback) is fully implemented.
ByteBufChannel byteBufChannel = new ByteBufChannel();
try {
send.writeTo(byteBufChannel);
} catch (IOException e) {
promise.setFailure(e);
return;
}
DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(byteBufChannel.getBuf(), true);
// Caller should call writeAndFlush().
ctx.write(dataFrame, promise);
}
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) {
ctx.write(new DefaultHttp2DataFrame(data.content().retainedDuplicate(), data.isEndStream()));
}
@Test
public void testStreamingRequest() throws Exception {
outputReceived = new CountDownLatch(3);
Http2Headers headers = new DefaultHttp2Headers().method("POST").path("/");
Http2Request requestIn = Http2Request.build(1, headers, false);
ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1");
Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false), false);
ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2");
Http2Request lastContent = Http2Request.build(1, new DefaultHttp2DataFrame(body2, true), true);
channel.writeInbound(requestIn);
channel.writeInbound(content);
channel.writeInbound(lastContent);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
Request requestOut = requests.remove(0);
assertNotNull(requestOut);
assertTrue(requestOut instanceof SegmentedRequest);
assertEquals("h2", requestOut.version());
assertEquals(HttpMethod.POST, requestOut.method());
assertEquals("/", requestOut.path());
assertFalse(requestOut.hasBody());
assertNotNull(requestOut.body());
assertEquals(0, requestOut.body().readableBytes());
Request bodyOut1 = requests.remove(0);
assertNotNull(bodyOut1);
assertTrue(bodyOut1 instanceof SegmentedRequestData);
assertEquals("h2", bodyOut1.version());
assertEquals(HttpMethod.POST, bodyOut1.method());
assertEquals("/", bodyOut1.path());
assertFalse(bodyOut1.hasBody());
assertNotNull(bodyOut1.body());
assertNotNull(((SegmentedRequestData) bodyOut1).content());
assertEquals(body1, ((SegmentedRequestData) bodyOut1).content());
assertFalse(bodyOut1.endOfMessage());
Request bodyOut2 = requests.remove(0);
assertNotNull(bodyOut2);
assertTrue(bodyOut2 instanceof SegmentedRequestData);
assertEquals("h2", bodyOut2.version());
assertEquals(HttpMethod.POST, bodyOut2.method());
assertEquals("/", bodyOut2.path());
assertFalse(bodyOut2.hasBody());
assertNotNull(bodyOut2.body());
assertNotNull(((SegmentedRequestData) bodyOut2).content());
assertEquals(body2, ((SegmentedRequestData) bodyOut2).content());
assertTrue(bodyOut2.endOfMessage());
}
@Test
public void testStreamingRequestWithTrailingHeaders() {
outputReceived = new CountDownLatch(4);
Http2Headers headers = new DefaultHttp2Headers().method("POST").path("/");
Http2Request requestIn = Http2Request.build(1, headers, false);
ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1");
Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false), false);
ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2");
Http2Request lastContent = Http2Request.build(1, new DefaultHttp2DataFrame(body2, true), false);
Http2Headers trailers = new DefaultHttp2Headers().set("foo", "bar");
Http2Request lastHeaders = Http2Request.build(1, trailers, true);
channel.writeInbound(requestIn);
channel.writeInbound(content);
channel.writeInbound(lastContent);
channel.writeInbound(lastHeaders);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
Request requestOut = requests.remove(0);
assertNotNull(requestOut);
assertTrue(requestOut instanceof SegmentedRequest);
assertEquals("h2", requestOut.version());
assertEquals(HttpMethod.POST, requestOut.method());
assertEquals("/", requestOut.path());
assertFalse(requestOut.hasBody());
assertNotNull(requestOut.body());
assertEquals(0, requestOut.body().readableBytes());
Request bodyOut1 = requests.remove(0);
assertNotNull(bodyOut1);
assertTrue(bodyOut1 instanceof SegmentedRequestData);
assertEquals("h2", bodyOut1.version());
assertEquals(HttpMethod.POST, bodyOut1.method());
assertEquals("/", bodyOut1.path());
assertFalse(bodyOut1.hasBody());
assertNotNull(bodyOut1.body());
assertNotNull(((SegmentedRequestData) bodyOut1).content());
assertEquals(body1, ((SegmentedRequestData) bodyOut1).content());
assertFalse(bodyOut1.endOfMessage());
Request bodyOut2 = requests.remove(0);
assertNotNull(bodyOut2);
assertTrue(bodyOut2 instanceof SegmentedRequestData);
assertEquals("h2", bodyOut2.version());
assertEquals(HttpMethod.POST, bodyOut2.method());
assertEquals("/", bodyOut2.path());
assertFalse(bodyOut2.hasBody());
assertNotNull(bodyOut2.body());
assertNotNull(((SegmentedRequestData) bodyOut2).content());
assertEquals(body2, ((SegmentedRequestData) bodyOut2).content());
assertFalse(bodyOut2.endOfMessage());
Request trailersOut = requests.remove(0);
assertNotNull(trailersOut);
assertTrue(trailersOut instanceof SegmentedRequestData);
assertEquals("h2", trailersOut.version());
assertEquals(HttpMethod.POST, trailersOut.method());
assertEquals("/", trailersOut.path());
assertFalse(trailersOut.hasBody());
assertNotNull(trailersOut.body());
assertEquals(0, trailersOut.body().readableBytes());
assertEquals(1, ((SegmentedRequestData) trailersOut).trailingHeaders().size());
assertEquals("bar", ((SegmentedRequestData) trailersOut).trailingHeaders().get("foo"));
assertTrue(trailersOut.endOfMessage());
}
@Test
public void testStreamingResponse() throws Exception {
outputReceived = new CountDownLatch(3);
Http2Headers headers = new DefaultHttp2Headers().status("200");
Http2Response responseIn = Http2Response.build(1, headers, false);
ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1");
Http2Response content = Http2Response.build(1, new DefaultHttp2DataFrame(body1, false), false);
ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2");
Http2Response lastContent =
Http2Response.build(1, new DefaultHttp2DataFrame(body2, true), true);
channel.writeInbound(responseIn);
channel.writeInbound(content);
channel.writeInbound(lastContent);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
SegmentedResponse responseOut = (SegmentedResponse) responses.remove(0);
assertTrue(responseOut != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(responseOut.hasBody());
assertFalse(responseOut.body() == null);
assertEquals(0, responseOut.body().readableBytes());
SegmentedResponseData bodyOut1 = (SegmentedResponseData) responses.remove(0);
assertTrue(bodyOut1 != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(bodyOut1.hasBody());
assertFalse(bodyOut1.body() == null);
assertFalse(bodyOut1.content() == null);
assertEquals(body1, bodyOut1.content());
assertFalse(bodyOut1.endOfMessage());
SegmentedResponseData bodyOut2 = (SegmentedResponseData) responses.remove(0);
assertTrue(bodyOut2 != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(bodyOut2.hasBody());
assertFalse(bodyOut2.body() == null);
assertFalse(bodyOut2.content() == null);
assertEquals(body2, bodyOut2.content());
assertTrue(bodyOut2.endOfMessage());
}
@Test
public void testStreamingResponseWithTrailingHeaders() throws Exception {
outputReceived = new CountDownLatch(3);
Http2Headers headers = new DefaultHttp2Headers().status("200");
Http2Response responseIn = Http2Response.build(1, headers, false);
ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1");
Http2Response content = Http2Response.build(1, new DefaultHttp2DataFrame(body1, false), false);
ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2");
Http2Response lastContent =
Http2Response.build(1, new DefaultHttp2DataFrame(body2, false), false);
Http2Response trailers =
Http2Response.build(1, new DefaultHttp2Headers().set("foo", "bar"), true);
channel.writeInbound(responseIn);
channel.writeInbound(content);
channel.writeInbound(lastContent);
channel.writeInbound(trailers);
channel.runPendingTasks(); // blocks
Uninterruptibles.awaitUninterruptibly(outputReceived);
SegmentedResponse responseOut = (SegmentedResponse) responses.remove(0);
assertTrue(responseOut != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(responseOut.hasBody());
assertFalse(responseOut.body() == null);
assertEquals(0, responseOut.body().readableBytes());
SegmentedResponseData bodyOut1 = (SegmentedResponseData) responses.remove(0);
assertTrue(bodyOut1 != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(bodyOut1.hasBody());
assertFalse(bodyOut1.body() == null);
assertFalse(bodyOut1.content() == null);
assertEquals(body1, bodyOut1.content());
assertFalse(bodyOut1.endOfMessage());
SegmentedResponseData bodyOut2 = (SegmentedResponseData) responses.remove(0);
assertTrue(bodyOut2 != null);
assertEquals("h2", responseOut.version());
assertEquals(HttpResponseStatus.OK, responseOut.status());
assertFalse(bodyOut2.hasBody());
assertFalse(bodyOut2.body() == null);
assertFalse(bodyOut2.content() == null);
assertEquals(body2, bodyOut2.content());
assertFalse(bodyOut2.endOfMessage());
SegmentedResponseData trailersOut = (SegmentedResponseData) responses.remove(0);
assertTrue(trailersOut != null);
assertEquals("h2", trailersOut.version());
assertEquals(HttpResponseStatus.OK, trailersOut.status());
assertFalse(trailersOut.hasBody());
assertFalse(trailersOut.body() == null);
assertEquals(0, trailersOut.body().readableBytes());
assertEquals(1, trailersOut.trailingHeaders().size());
assertEquals("bar", trailersOut.trailingHeaders().get("foo"));
assertTrue(trailersOut.endOfMessage());
}
@Test
public void testGrpcServer() throws Exception {
final Http2Headers cannedHeaders = new DefaultHttp2Headers();
cannedHeaders
.status("200")
.add("content-type", "application/grpc")
.add("grpc-encoding", "identity")
.add("grpc-accept-encoding", "gzip");
final Http2Headers cannedTrailers = new DefaultHttp2Headers().add("grpc-status", "0");
ByteBuf buf =
Unpooled.copiedBuffer(ByteBufUtil.decodeHexDump("000000000d0a0b48656c6c6f20776f726c64"));
final Http2DataFrame cannedData = new DefaultHttp2DataFrame(buf.retain(), false);
XioServerBootstrap bootstrap =
XioServerBootstrap.fromConfig("xio.testGrpcServer")
.addToPipeline(
new SmartHttpPipeline() {
@Override
public ChannelHandler getApplicationRouter() {
return new PipelineRouter(
ImmutableMap.of(),
new PipelineRequestHandler() {
@Override
public void handle(
ChannelHandlerContext ctx, Request request, RouteState route) {
if (request instanceof SegmentedRequestData) {
SegmentedRequestData streaming = (SegmentedRequestData) request;
if (streaming.endOfMessage()) {
ctx.write(Http2Response.build(request.streamId(), cannedHeaders));
ctx.write(
Http2Response.build(request.streamId(), cannedData, false));
ctx.write(
Http2Response.build(request.streamId(), cannedTrailers, true));
}
}
}
});
}
});
XioServer xioServer = bootstrap.build();
GrpcClient client = GrpcClient.run(xioServer.getPort());
HelloReply response = client.greet("world");
assertEquals("Hello world", response.getMessage());
client.shutdown();
xioServer.close();
}