下面列出了怎么用 io.netty.handler.codec.http.LastHttpContent 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public PipelineContinuationBehavior doChannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof LastHttpContent) {
HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();
Endpoint<?> endpoint = state.getEndpointForExecution();
RequestInfo reqInfo = state.getRequestInfo();
// Don't bother trying to deserialize until we have an endpoint and the request content has fully arrived
if (endpoint != null && reqInfo.isCompleteRequestWithAllChunks()) {
// Setup the content deserializer if desired
TypeReference<?> contentTypeRef = endpoint.requestContentType();
if (contentTypeRef != null) {
// A non-null TypeReference is available, so deserialization is possible. Retrieve the appropriate
// deserializer and setup the RequestInfo so that it can lazily deserialize when requested.
ObjectMapper deserializer = endpoint.customRequestContentDeserializer(reqInfo);
if (deserializer == null)
deserializer = defaultRequestContentDeserializer;
//noinspection unchecked
reqInfo.setupContentDeserializer(deserializer, contentTypeRef);
}
}
}
return PipelineContinuationBehavior.CONTINUE;
}
@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
counter.inc();
byte[] content = null;
try(InputStream is = FallbackResponder.class.getClassLoader().getResourceAsStream(resource)) {
content = IOUtils.toByteArray(is);
}
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders.setContentLength(response, content.length);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_XML_UTF_8.toString());
response.content().writeBytes(content);
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
}
/**
* Initialized the internals from a new chunk
*
* @param content
* the new received chunk
* @throws ErrorDataDecoderException
* if there is a problem with the charset decoding or other
* errors
*/
@Override
public HttpPostStandardRequestDecoder offer(HttpContent content) {
checkDestroyed();
// Maybe we should better not copy here for performance reasons but this will need
// more care by the caller to release the content in a correct manner later
// So maybe something to optimize on a later stage
ByteBuf buf = content.content();
if (undecodedChunk == null) {
undecodedChunk = buf.copy();
} else {
undecodedChunk.writeBytes(buf);
}
if (content instanceof LastHttpContent) {
isLastChunk = true;
}
parseBody();
if (undecodedChunk != null && undecodedChunk.writerIndex() > discardThreshold) {
undecodedChunk.discardReadBytes();
}
return this;
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
httpResponse = (HttpResponse) httpObject;
captureContentEncoding(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
storeResponseContent(httpContent);
if (httpContent instanceof LastHttpContent) {
LastHttpContent lastContent = (LastHttpContent) httpContent;
captureTrailingHeaders(lastContent);
captureFullResponseContents();
}
}
return super.serverToProxyResponse(httpObject);
}
@Test
public void testDataIsMultipleOfChunkSize2() throws Exception {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, "http://localhost");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(request, true);
int length = 7943;
char[] array = new char[length];
Arrays.fill(array, 'a');
String longText = new String(array);
encoder.addBodyAttribute("foo", longText);
assertNotNull(encoder.finalizeRequest());
checkNextChunkSize(encoder, 8080);
HttpContent httpContent = encoder.readChunk((ByteBufAllocator) null);
assertTrue("Expected LastHttpContent is not received", httpContent instanceof LastHttpContent);
httpContent.release();
assertTrue("Expected end of input is not receive", encoder.isEndOfInput());
}
@Override
protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception {
if (response instanceof HttpResponse) {
if (status != null) {
throw new ProxyConnectException(exceptionMessage("too many responses"));
}
status = ((HttpResponse) response).status();
}
boolean finished = response instanceof LastHttpContent;
if (finished) {
if (status == null) {
throw new ProxyConnectException(exceptionMessage("missing response"));
}
if (status.code() != 200) {
throw new ProxyConnectException(exceptionMessage("status: " + status));
}
}
return finished;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
// Break out LiveHttpResponse and and LastHttpContent handling in separate
// blocks. This way it doesn't require an HttpObjectAggregator in the
// pipeline.
//
if (msg instanceof HttpResponse) {
HttpResponse resp = (HttpResponse) msg;
int code = resp.getStatus().code();
updateHttpResponseCounters(code);
}
if (msg instanceof HttpContent && !firstContentChunkReceived) {
stopAndRecordTimeToFirstByte();
firstContentChunkReceived = true;
}
if (msg instanceof LastHttpContent) {
stopAndRecordLatency();
}
super.channelRead(ctx, msg);
}
@Test
public void downgrade() throws Exception {
setUpServerChannel();
String requestString = "GET / HTTP/1.1\r\n" +
"Host: example.com\r\n\r\n";
ByteBuf inbound = Unpooled.buffer().writeBytes(requestString.getBytes(CharsetUtil.US_ASCII));
assertTrue(channel.writeInbound(inbound));
Object firstInbound = channel.readInbound();
assertTrue(firstInbound instanceof HttpRequest);
HttpRequest request = (HttpRequest) firstInbound;
assertEquals(HttpMethod.GET, request.method());
assertEquals("/", request.uri());
assertEquals(HttpVersion.HTTP_1_1, request.protocolVersion());
assertEquals(new DefaultHttpHeaders().add("Host", "example.com"), request.headers());
((LastHttpContent) channel.readInbound()).release();
assertNull(channel.readInbound());
}
@Test
public void addContentChunk_adds_last_chunk_trailing_headers() {
// given
RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests();
requestInfo.isCompleteRequestWithAllChunks = false;
LastHttpContent lastChunk = new DefaultLastHttpContent(Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8));
String headerKey = UUID.randomUUID().toString();
List<String> headerVal = Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString());
lastChunk.trailingHeaders().add(headerKey, headerVal);
// when
requestInfo.addContentChunk(lastChunk);
// then
assertThat(requestInfo.trailingHeaders.names().size(), is(1));
assertThat(requestInfo.trailingHeaders.getAll(headerKey), is(headerVal));
}
@Before
public void beforeMethod() {
channelMock = mock(Channel.class);
ctxMock = mock(ChannelHandlerContext.class);
stateAttributeMock = mock(Attribute.class);
state = new HttpProcessingState();
doReturn(channelMock).when(ctxMock).channel();
doReturn(stateAttributeMock).when(channelMock).attr(ChannelAttributes.HTTP_PROCESSING_STATE_ATTRIBUTE_KEY);
doReturn(state).when(stateAttributeMock).get();
firstChunkMsgMock = mock(HttpRequest.class);
lastChunkMsgMock = mock(LastHttpContent.class);
filter1Mock = mock(RequestAndResponseFilter.class);
filter2Mock = mock(RequestAndResponseFilter.class);
filtersList = Arrays.asList(filter1Mock, filter2Mock);
handlerSpy = spy(new RequestFilterHandler(filtersList));
requestInfoMock = mock(RequestInfo.class);
state.setRequestInfo(requestInfoMock);
}
@Test
public void shouldDecodeAuthFailureBucketConfigResponse() throws Exception {
HttpResponse responseHeader = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
new HttpResponseStatus(401, "Unauthorized"));
HttpContent responseChunk = LastHttpContent.EMPTY_LAST_CONTENT;
BucketConfigRequest requestMock = mock(BucketConfigRequest.class);
requestQueue.add(requestMock);
channel.writeInbound(responseHeader, responseChunk);
assertEquals(1, eventSink.responseEvents().size());
BucketConfigResponse event = (BucketConfigResponse) eventSink.responseEvents().get(0).getMessage();
assertEquals(ResponseStatus.ACCESS_ERROR, event.status());
assertEquals("Unauthorized", event.config());
assertTrue(requestQueue.isEmpty());
}
@Test
public void testDowngradeEndData() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
assertTrue(ch.writeInbound(new DefaultHttp2DataFrame(hello, true)));
LastHttpContent content = ch.readInbound();
try {
assertThat(content.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertTrue(content.trailingHeaders().isEmpty());
} finally {
content.release();
}
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testEncodeEmptyEndAsClient() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
LastHttpContent end = LastHttpContent.EMPTY_LAST_CONTENT;
assertTrue(ch.writeOutbound(end));
Http2DataFrame emptyFrame = ch.readOutbound();
try {
assertThat(emptyFrame.content().readableBytes(), is(0));
assertTrue(emptyFrame.isEndStream());
} finally {
emptyFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testEncodeDataEndAsClient() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
LastHttpContent end = new DefaultLastHttpContent(hello, true);
assertTrue(ch.writeOutbound(end));
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertTrue(dataFrame.isEndStream());
} finally {
dataFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject in) {
// Make sure that we increase refCnt because we are going to process it async. The other end has to release
// after processing.
responseParts.offer(ReferenceCountUtil.retain(in));
if (in instanceof HttpResponse && in.decoderResult().isSuccess()) {
isKeepAlive = HttpUtil.isKeepAlive((HttpResponse) in);
} else if (in.decoderResult().isFailure()) {
Throwable cause = in.decoderResult().cause();
if (cause instanceof Exception) {
exception = (Exception) cause;
} else {
exception =
new Exception("Encountered Throwable when trying to decode response. Message: " + cause.getMessage());
}
invokeFutureAndCallback("CommunicationHandler::channelRead0 - decoder failure");
}
if (in instanceof LastHttpContent) {
if (isKeepAlive) {
invokeFutureAndCallback("CommunicationHandler::channelRead0 - last content");
} else {
// if not, the future will be invoked when the channel is closed.
ctx.close();
}
}
}
/** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */
@Test
public void httpErrorsAreSupported() throws IOException {
EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of()));
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out);
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpResponse response =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
response.headers().set(HttpHeaders.HOST, "localhost");
response.headers().set(HttpHeaders.CONTENT_LENGTH, 0);
response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ch.writeInbound(response);
ch.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
assertThat(writePromise.isDone()).isTrue();
assertThat(writePromise.cause()).isInstanceOf(HttpException.class);
assertThat(((HttpException) writePromise.cause()).response().status())
.isEqualTo(HttpResponseStatus.NOT_FOUND);
// No data should have been written to the OutputStream and it should have been closed.
assertThat(out.size()).isEqualTo(0);
// The caller is responsible for closing the stream.
verify(out, never()).close();
assertThat(ch.isOpen()).isTrue();
}
private void handleUploadMessage(io.netty.handler.codec.http.HttpMessage httpMsg, Message uploadMessage) throws IOException{
if (httpMsg instanceof HttpContent) {
HttpContent chunk = (HttpContent) httpMsg;
decoder.offer(chunk);
try {
while (decoder.hasNext()) {
InterfaceHttpData data = decoder.next();
if (data != null) {
try {
handleUploadFile(data, uploadMessage);
} finally {
data.release();
}
}
}
} catch (EndOfDataDecoderException e1) {
//ignore
}
if (chunk instanceof LastHttpContent) {
resetUpload();
}
}
}
/**
* Sends the provided {@code httpRequest} and verifies that the response is an echo of the {@code restMethod}.
* @param channel the {@link EmbeddedChannel} to send the request over.
* @param httpMethod the {@link HttpMethod} for the request.
* @param restMethod the equivalent {@link RestMethod} for {@code httpMethod}. Used to check for correctness of
* response.
* @param isKeepAlive if the request needs to be keep-alive.
* @throws IOException
*/
private void sendRequestCheckResponse(EmbeddedChannel channel, HttpMethod httpMethod, RestMethod restMethod,
boolean isKeepAlive) throws IOException {
long requestId = REQUEST_ID_GENERATOR.getAndIncrement();
String uri = MockRestRequestService.ECHO_REST_METHOD + requestId;
HttpRequest httpRequest = RestTestUtils.createRequest(httpMethod, uri, null);
HttpUtil.setKeepAlive(httpRequest, isKeepAlive);
channel.writeInbound(httpRequest);
channel.writeInbound(new DefaultLastHttpContent());
HttpResponse response = (HttpResponse) channel.readOutbound();
assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status());
// MockRestRequestService echoes the RestMethod + request id.
String expectedResponse = restMethod.toString() + requestId;
assertEquals("Unexpected content", expectedResponse,
RestTestUtils.getContentString((HttpContent) channel.readOutbound()));
assertTrue("End marker was expected", channel.readOutbound() instanceof LastHttpContent);
}
@Override
public void send(){
if(hasSend){
return ;
}
hasSend = true;
if(channel != null){
DFTcpChannelWrap chWrap = (DFTcpChannelWrap) channel;
chWrap.writeNoFlush(this);
if(_bufContent != null){
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, _bufContent.readableBytes());
chWrap.writeNoFlush(_bufContent);
}
channel.write(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
/**
* Handles a {@link HttpContent}. Checks state and echoes back the content.
* @param httpContent the {@link HttpContent} that needs to be handled.
* @throws Exception
*/
private void handleContent(HttpContent httpContent) throws Exception {
if (request != null) {
boolean isLast = httpContent instanceof LastHttpContent;
ChannelWriteCallback callback = new ChannelWriteCallback(httpContent);
// Retain it here since SimpleChannelInboundHandler would auto release it after the channelRead0.
// And release it in the callback.
callback.setFuture(restResponseChannel.write(httpContent.content().retain(), callback));
writeCallbacksToVerify.add(callback);
if (isLast) {
restResponseChannel.onResponseComplete(null);
assertFalse("Request channel is not closed", request.isOpen());
}
} else {
throw new RestServiceException("Received data without a request", RestServiceErrorCode.InvalidRequestState);
}
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (httpObject instanceof HttpResponse) {
httpResponse = (HttpResponse) httpObject;
captureContentEncoding(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
storeResponseContent(httpContent);
if (httpContent instanceof LastHttpContent) {
LastHttpContent lastContent = (LastHttpContent) httpContent;
captureTrailingHeaders(lastContent);
captureFullResponseContents();
}
}
return super.serverToProxyResponse(httpObject);
}
/**
* Combines all the parts in {@code contents} into one {@link ByteBuffer}.
* @param contents the content of the response.
* @param expectedContentLength the length of the contents in bytes.
* @return a {@link ByteBuffer} that contains all the data in {@code contents}.
*/
private ByteBuffer getContent(Queue<HttpObject> contents, long expectedContentLength) {
ByteBuffer buffer = ByteBuffer.allocate((int) expectedContentLength);
boolean endMarkerFound = false;
for (HttpObject object : contents) {
assertFalse("There should have been no more data after the end marker was found", endMarkerFound);
HttpContent content = (HttpContent) object;
buffer.put(content.content().nioBuffer());
endMarkerFound = object instanceof LastHttpContent;
content.release();
}
assertEquals("Content length did not match expected", expectedContentLength, buffer.position());
assertTrue("End marker was not found", endMarkerFound);
buffer.flip();
return buffer;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
try {
if (msg instanceof HttpRequest) {
initFileChannel();
} else if (msg instanceof HttpContent) {
if (fileChnl == null) {
initFileChannel();
}
ByteBuf byteBuf = ((HttpContent) msg).content();
writeBytesToFile(byteBuf);
} else if (msg instanceof LastHttpContent) {
if (fileChnl != null && outStream != null) {
fileChnl.close();
outStream.close();
}
ctx.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void channelRead(@Nullable ChannelHandlerContext ctx, @Nullable Object msg) throws Exception {
if (msg == null) {
return;
}
try {
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
incomingMessage += content.content().toString(CharsetUtil.UTF_8);
}
if (msg instanceof LastHttpContent) {
onvifConnection.processReply(incomingMessage);
incomingMessage = "";
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override protected void channelRead0(
ChannelHandlerContext context, HttpObject message) throws Exception {
if (message instanceof HttpResponse) {
receive((HttpResponse) message);
}
if (message instanceof HttpContent) {
receive((HttpContent) message);
if (message instanceof LastHttpContent) {
release(this);
}
}
}
/**
* If the operation completed successfully, a write via the {@link ChunkedWriteHandler} is initiated. Otherwise,
* failure is handled.
* @param future the {@link ChannelFuture} that is being listened on.
*/
@Override
public void operationComplete(ChannelFuture future) {
long writeFinishTime = System.currentTimeMillis();
if (future.isSuccess()) {
if (finalResponseMetadata instanceof LastHttpContent) {
// this is the case if finalResponseMetadata is a FullHttpResponse.
// in this case there is nothing more to write.
if (!writeFuture.isDone()) {
writeFuture.setSuccess();
completeRequest(!HttpUtil.isKeepAlive(finalResponseMetadata), false);
}
} else {
// otherwise there is some content to write.
logger.trace("Starting ChunkedWriteHandler on channel {}", ctx.channel());
writeFuture.addListener(new CallbackInvoker());
ctx.writeAndFlush(new ChunkDispenser(), writeFuture);
}
} else {
handleChannelWriteFailure(future.cause(), true);
}
long responseAfterWriteProcessingTime = System.currentTimeMillis() - writeFinishTime;
long channelWriteTime = writeFinishTime - responseWriteStartTime;
nettyMetrics.channelWriteTimeInMs.update(channelWriteTime);
nettyMetrics.responseMetadataAfterWriteProcessingTimeInMs.update(responseAfterWriteProcessingTime);
if (request != null) {
request.getMetricsTracker().nioMetricsTracker.addToResponseProcessingTime(
channelWriteTime + responseAfterWriteProcessingTime);
}
}
@Override
public HttpObject proxyToClientResponse(HttpObject httpObject) {
if (httpObject instanceof LastHttpContent) {
activityMonitor.requestFinished();
}
return super.proxyToClientResponse(httpObject);
}
@Override
public HttpObject serverToProxyResponse(HttpObject httpObject) {
// if a ServerResponseCaptureFilter is configured, delegate to it to collect the server's response. if it is not
// configured, we still need to capture basic information (timings, HTTP status, etc.), just not content.
if (responseCaptureFilter != null) {
responseCaptureFilter.serverToProxyResponse(httpObject);
}
if (httpObject instanceof HttpResponse) {
HttpResponse httpResponse = (HttpResponse) httpObject;
captureResponse(httpResponse);
}
if (httpObject instanceof HttpContent) {
HttpContent httpContent = (HttpContent) httpObject;
captureResponseSize(httpContent);
}
if (httpObject instanceof LastHttpContent) {
if (dataToCapture.contains(CaptureType.RESPONSE_CONTENT)) {
captureResponseContent(responseCaptureFilter.getHttpResponse(), responseCaptureFilter.getFullResponseContents());
}
harEntry.getResponse().setBodySize((long)(responseBodySize.get()));
}
this.harEntry.setTime(getTotalElapsedTime(this.harEntry.getTimings()));
return super.serverToProxyResponse(httpObject);
}
@Test
public void lastHttpContentReceived_shouldSetAttribute() {
LastHttpContent lastHttpContent = LastHttpContent.EMPTY_LAST_CONTENT;
contentHandler.channelRead(handlerContext, lastHttpContent);
assertThat(channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).get()).isTrue();
}
@Test
public void lastHttpContentObjectDoesNotUpdateCounters() throws Exception {
RequestsToOriginMetricsCollector handler = new RequestsToOriginMetricsCollector(this.originMetrics);
LastHttpContent msg = mock(LastHttpContent.class);
handler.channelRead(this.ctx, msg);
assertThat(this.metricRegistry.meter(name(APP_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(0L));
assertThat(this.metricRegistry.meter(name(APP_METRIC_PREFIX, REQUEST_FAILURE)).getCount(), is(0L));
assertThat(this.metricRegistry.meter(name(ORIGIN_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(0L));
assertThat(this.metricRegistry.meter(name(ORIGIN_METRIC_PREFIX, REQUEST_FAILURE)).getCount(), is(0L));
}