下面列出了怎么用 io.netty.handler.codec.http.HttpUtil 的API类实例代码及写法,或者点击链接到github查看源代码。
private void writeResourceReport(Channel channel) {
ByteBuf content = Unpooled.buffer();
Writer writer = new OutputStreamWriter(new ByteBufOutputStream(content), CharsetUtil.UTF_8);
try {
reportAdapter.toJson(resourceReport.get(), writer);
writer.close();
} catch (IOException e) {
LOG.error("error writing resource report", e);
writeAndClose(channel, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR,
Unpooled.copiedBuffer(e.getMessage(), StandardCharsets.UTF_8)));
return;
}
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
HttpUtil.setContentLength(response, content.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
channel.writeAndFlush(response);
}
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
currentObj.decoderResult().isSuccess() ? HttpResponseStatus.OK : HttpResponseStatus.BAD_REQUEST,
Unpooled.EMPTY_BUFFER);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the response.
ctx.write(response);
return keepAlive;
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testSetDefaultVersionNoop"})
public void testLoadModelWithInitialWorkersWithJSONReqBody() throws InterruptedException {
Channel channel = TestUtils.getManagementChannel(configManager);
testUnregisterModel("noop", null);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/models");
req.headers().add("Content-Type", "application/json");
req.content()
.writeCharSequence(
"{'url':'noop.mar', 'model_name':'noop', 'initial_workers':'1', 'synchronous':'true'}",
CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
channel.writeAndFlush(req);
TestUtils.getLatch().await();
StatusResponse resp = JsonUtils.GSON.fromJson(TestUtils.getResult(), StatusResponse.class);
Assert.assertEquals(
resp.getStatus(), "Model \"noop\" Version: 1.11 registered with 1 initial workers");
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testNoopPrediction"})
public void testPredictionsBinary() throws InterruptedException {
Channel channel = TestUtils.getInferenceChannel(configManager);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/noop");
req.content().writeCharSequence("test", CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM);
channel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getResult(), "OK");
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testInvocationsMultipart"})
public void testModelsInvokeJson() throws InterruptedException {
Channel channel = TestUtils.getInferenceChannel(configManager);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/models/noop/invoke");
req.content().writeCharSequence("{\"data\": \"test\"}", CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
channel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getResult(), "OK");
}
/**
* Call the {@code GET /accounts} API and deserialize the response.
* @param accountName if non-null, fetch a single account by name instead of all accounts.
* @param accountId if non-null, fetch a single account by ID instead of all accounts.
* @return the accounts fetched.
*/
private Set<Account> getAccounts(String accountName, Short accountId) throws Exception {
HttpHeaders headers = new DefaultHttpHeaders();
if (accountName != null) {
headers.add(RestUtils.Headers.TARGET_ACCOUNT_NAME, accountName);
} else if (accountId != null) {
headers.add(RestUtils.Headers.TARGET_ACCOUNT_ID, accountId);
}
FullHttpRequest request = buildRequest(HttpMethod.GET, Operations.ACCOUNTS, headers, null);
ResponseParts responseParts = nettyClient.sendRequest(request, null, null).get();
HttpResponse response = getHttpResponse(responseParts);
assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status());
verifyTrackingHeaders(response);
ByteBuffer content = getContent(responseParts.queue, HttpUtil.getContentLength(response));
return new HashSet<>(
AccountCollectionSerde.fromJson(new JSONObject(new String(content.array(), StandardCharsets.UTF_8))));
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testPredictionsValidRequestSize"})
public void testPredictionsDecodeRequest()
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
Channel inferChannel = TestUtils.getInferenceChannel(configManager);
Channel mgmtChannel = TestUtils.getManagementChannel(configManager);
setConfiguration("decode_input_request", "true");
loadTests(mgmtChannel, "noop-v1.0-config-tests.mar", "noop-config");
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/noop-config");
req.content().writeCharSequence("{\"data\": \"test\"}", CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
inferChannel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.OK);
Assert.assertFalse(TestUtils.getResult().contains("bytearray"));
unloadTests(mgmtChannel, "noop-config");
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testPredictionsDecodeRequest"})
public void testPredictionsDoNotDecodeRequest()
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
Channel inferChannel = TestUtils.getInferenceChannel(configManager);
Channel mgmtChannel = TestUtils.getManagementChannel(configManager);
setConfiguration("decode_input_request", "false");
loadTests(mgmtChannel, "noop-v1.0-config-tests.mar", "noop-config");
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/noop-config");
req.content().writeCharSequence("{\"data\": \"test\"}", CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
inferChannel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.OK);
Assert.assertTrue(TestUtils.getResult().contains("bytearray"));
unloadTests(mgmtChannel, "noop-config");
}
/**
* Tests that the underlying network channel is closed when {@link NettyResponseChannel#close()} is called.
*/
@Test
public void closeTest() {
// request is keep-alive by default.
HttpRequest request = createRequestWithHeaders(HttpMethod.GET, TestingUri.Close.toString());
EmbeddedChannel channel = createEmbeddedChannel();
channel.writeInbound(request);
HttpResponse response = (HttpResponse) channel.readOutbound();
assertEquals("Unexpected response status", HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
assertFalse("Inconsistent value for Connection header", HttpUtil.isKeepAlive(response));
// drain the channel of content.
while (channel.readOutbound() != null) {
}
assertFalse("Channel should be closed", channel.isOpen());
}
@Override
public Flux<?> receiveObject() {
// Handle the 'Expect: 100-continue' header if necessary.
// TODO: Respond with 413 Request Entity Too Large
// and discard the traffic or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an error.
if (HttpUtil.is100ContinueExpected(nettyRequest)) {
return FutureMono.deferFuture(() -> {
if(!hasSentHeaders()) {
return channel().writeAndFlush(CONTINUE);
}
return channel().newSucceededFuture();
})
.thenMany(super.receiveObject());
}
else {
return super.receiveObject();
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
int statusCode = res.status().code();
if (statusCode != HttpResponseStatus.OK.code() && res.content().readableBytes() == 0) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
HttpUtil.setContentLength(res, res.content().readableBytes());
// Send the response and close the connection if necessary.
if (!HttpUtil.isKeepAlive(req) || statusCode != HttpResponseStatus.OK.code()) {
res.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
} else {
res.headers().set(CONNECTION, CLOSE);
///
//if (req.protocolVersion().equals(HTTP_1_0)) {
// res.headers().set(CONNECTION, KEEP_ALIVE);
//}
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
}
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
int statusCode = res.status().code();
if (statusCode != HttpResponseStatus.OK.code()
&& res.content().readableBytes() == 0) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
HttpUtil.setContentLength(res, res.content().readableBytes());
// Send the response and close the connection if necessary.
if (!HttpUtil.isKeepAlive(req)
|| statusCode != HttpResponseStatus.OK.code()) {
res.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
} else {
res.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
}
private void testPredictionsValidRequestSize(Channel channel) throws InterruptedException {
result = null;
latch = new CountDownLatch(1);
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/noop");
req.content().writeZero(10385760);
HttpUtil.setContentLength(req, req.content().readableBytes());
req.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM);
channel.writeAndFlush(req);
latch.await();
Assert.assertEquals(httpStatus, HttpResponseStatus.OK);
}
private static void sendHttpResponse(
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(HelloWorldHttp2Handler.RESPONSE_BYTES.duplicate());
ByteBufUtil.writeAscii(content, " - via " + req.protocolVersion() + " (" + establishApproach + ")");
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, content);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
@Override
protected void sendResponse(final ChannelHandlerContext ctx, String streamId, int latency,
final FullHttpResponse response, final FullHttpRequest request) {
HttpUtil.setContentLength(response, response.content().readableBytes());
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);
} else {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}, latency, TimeUnit.MILLISECONDS);
}
private static void sendHttpResponse(
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
HttpRequest request = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(request)) {
ctx.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
// find the nettyHttpContextHandler by lookup the request url
NettyHttpContextHandler nettyHttpContextHandler = pipelineFactory.getNettyHttpHandler(request.uri());
if (nettyHttpContextHandler != null) {
handleHttpServletRequest(ctx, request, nettyHttpContextHandler);
} else {
throw new RuntimeException(
new Fault(new Message("NO_NETTY_SERVLET_HANDLER_FOUND", LOG, request.uri())));
}
}
@Test
public void shortCircuitWithoutConnectionShouldStayOpen() {
final CorsConfig config = forOrigin("http://localhost:8080").shortCircuit().build();
final EmbeddedChannel channel = new EmbeddedChannel(new CorsHandler(config));
final FullHttpRequest request = createHttpRequest(GET);
request.headers().set(ORIGIN, "http://localhost:8888");
assertThat(channel.writeInbound(request), is(false));
final HttpResponse response = channel.readOutbound();
assertThat(HttpUtil.isKeepAlive(response), is(true));
assertThat(channel.isOpen(), is(true));
assertThat(response.status(), is(FORBIDDEN));
assertThat(ReferenceCountUtil.release(response), is(true));
assertThat(channel.finish(), is(false));
}
@Test
public void shortCircuitWithConnectionCloseShouldClose() {
final CorsConfig config = forOrigin("http://localhost:8080").shortCircuit().build();
final EmbeddedChannel channel = new EmbeddedChannel(new CorsHandler(config));
final FullHttpRequest request = createHttpRequest(GET);
request.headers().set(ORIGIN, "http://localhost:8888");
request.headers().set(CONNECTION, CLOSE);
assertThat(channel.writeInbound(request), is(false));
final HttpResponse response = channel.readOutbound();
assertThat(HttpUtil.isKeepAlive(response), is(false));
assertThat(channel.isOpen(), is(false));
assertThat(response.status(), is(FORBIDDEN));
assertThat(ReferenceCountUtil.release(response), is(true));
assertThat(channel.finish(), is(false));
}
/**
* Translate and add HTTP/2 headers to HTTP/1.x headers.
*
* @param streamId The stream associated with {@code sourceHeaders}.
* @param inputHeaders The HTTP/2 headers to convert.
* @param outputHeaders The object which will contain the resulting HTTP/1.x headers..
* @param httpVersion What HTTP/1.x version {@code outputHeaders} should be treated as when doing the conversion.
* @param isTrailer {@code true} if {@code outputHeaders} should be treated as trailing headers.
* {@code false} otherwise.
* @param isRequest {@code true} if the {@code outputHeaders} will be used in a request message.
* {@code false} for response message.
* @throws Http2Exception If not all HTTP/2 headers can be translated to HTTP/1.x.
*/
public static void addHttp2ToHttpHeaders(int streamId, Http2Headers inputHeaders, HttpHeaders outputHeaders,
HttpVersion httpVersion, boolean isTrailer, boolean isRequest) throws Http2Exception {
Http2ToHttpHeaderTranslator translator = new Http2ToHttpHeaderTranslator(streamId, outputHeaders, isRequest);
try {
for (Entry<CharSequence, CharSequence> entry : inputHeaders) {
translator.translate(entry);
}
} catch (Http2Exception ex) {
throw ex;
} catch (Throwable t) {
throw streamError(streamId, PROTOCOL_ERROR, t, "HTTP/2 to HTTP/1.x headers conversion error");
}
outputHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
outputHeaders.remove(HttpHeaderNames.TRAILER);
if (!isTrailer) {
outputHeaders.setInt(ExtensionHeaderNames.STREAM_ID.text(), streamId);
HttpUtil.setKeepAlive(outputHeaders, httpVersion, true);
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
long writeFinishTime = System.currentTimeMillis();
long channelWriteTime = writeFinishTime - responseWriteStartTime;
if (future.isSuccess()) {
completeRequest(!HttpUtil.isKeepAlive(finalResponseMetadata), true);
} else {
handleChannelWriteFailure(future.cause(), true);
}
long responseAfterWriteProcessingTime = System.currentTimeMillis() - writeFinishTime;
nettyMetrics.channelWriteTimeInMs.update(channelWriteTime);
nettyMetrics.responseMetadataAfterWriteProcessingTimeInMs.update(responseAfterWriteProcessingTime);
if (request != null) {
request.getMetricsTracker().nioMetricsTracker.addToResponseProcessingTime(
channelWriteTime + responseAfterWriteProcessingTime);
}
}
@Test
public void testDowngradeHeadersWithContentLength() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
Http2Headers headers = new DefaultHttp2Headers();
headers.path("/");
headers.method("GET");
headers.setInt("content-length", 0);
assertTrue(ch.writeInbound(new DefaultHttp2HeadersFrame(headers)));
HttpRequest request = ch.readInbound();
assertThat(request.uri(), is("/"));
assertThat(request.method(), is(HttpMethod.GET));
assertThat(request.protocolVersion(), is(HttpVersion.HTTP_1_1));
assertFalse(request instanceof FullHttpRequest);
assertFalse(HttpUtil.isTransferEncodingChunked(request));
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
}
private void testLoadModelWithInitialWorkersWithJSONReqBody(Channel channel)
throws InterruptedException {
testUnregisterModel(channel);
result = null;
latch = new CountDownLatch(1);
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/models");
req.headers().add("Content-Type", "application/json");
req.content()
.writeCharSequence(
"{'url':'noop-v0.1', 'model_name':'noop_v0.1', 'initial_workers':'1', 'synchronous':'true'}",
CharsetUtil.UTF_8);
HttpUtil.setContentLength(req, req.content().readableBytes());
channel.writeAndFlush(req);
latch.await();
StatusResponse resp = JsonUtils.GSON.fromJson(result, StatusResponse.class);
Assert.assertEquals(resp.getStatus(), "Workers scaled");
}
@Test
public void testDecodeFullResponseHeaders() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
Http2Headers headers = new DefaultHttp2Headers();
headers.scheme(HttpScheme.HTTP.name());
headers.status(HttpResponseStatus.OK.codeAsText());
assertTrue(ch.writeInbound(new DefaultHttp2HeadersFrame(headers, true)));
FullHttpResponse response = ch.readInbound();
try {
assertThat(response.status(), is(HttpResponseStatus.OK));
assertThat(response.protocolVersion(), is(HttpVersion.HTTP_1_1));
assertThat(response.content().readableBytes(), is(0));
assertTrue(response.trailingHeaders().isEmpty());
assertFalse(HttpUtil.isTransferEncodingChunked(response));
} finally {
response.release();
}
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(HelloWorldHttp2Handler.RESPONSE_BYTES.duplicate());
ByteBufUtil.writeAscii(content, " - via " + req.protocolVersion() + " (" + establishApproach + ")");
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, content);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
@Override
public void channelRead(final ChannelHandlerContext handlerContext, final Object message) {
if (message instanceof HttpRequest) {
final HttpRequest request = (HttpRequest)message;
if (HttpUtil.is100ContinueExpected(request))
handlerContext.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
final boolean keepAlive = HttpUtil.isKeepAlive(request);
final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(CONNECTION, Values.KEEP_ALIVE);
handlerContext.write(response);
}
else {
handlerContext.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof HttpRequest)) return;
HttpRequest req = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK,
Unpooled.wrappedBuffer(HELLO_WORLD));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
@Override
public final NettyOutbound sendFile(Path file, long position, long count) {
Objects.requireNonNull(file);
if (hasSentHeaders()) {
return super.sendFile(file, position, count);
}
if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) && !HttpUtil.isContentLengthSet(
outboundHttpMessage()) && count < Integer.MAX_VALUE) {
outboundHttpMessage().headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, (int) count);
}
else if (!HttpUtil.isContentLengthSet(outboundHttpMessage())) {
outboundHttpMessage().headers()
.remove(HttpHeaderNames.CONTENT_LENGTH)
.remove(HttpHeaderNames.TRANSFER_ENCODING);
HttpUtil.setTransferEncodingChunked(outboundHttpMessage(), true);
}
return super.sendFile(file, position, count);
}
protected int sendHttp1Response(ChannelHandlerContext ctx, HttpResponseStatus status, String resultStr,
boolean isKeepAlive) {
ByteBuf content = Unpooled.copiedBuffer(resultStr, RpcConstants.DEFAULT_CHARSET);
FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, status, content);
res.headers().set(CONTENT_TYPE, "text/html; charset=" + RpcConstants.DEFAULT_CHARSET.displayName());
HttpUtil.setContentLength(res, content.readableBytes());
try {
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (isKeepAlive) {
HttpUtil.setKeepAlive(res, true);
} else {
HttpUtil.setKeepAlive(res, false); //set keepalive closed
f.addListener(ChannelFutureListener.CLOSE);
}
} catch (Exception e2) {
LOGGER.warn("Failed to send HTTP response to remote, cause by:", e2);
}
return content.readableBytes();
}