下面列出了怎么用 io.netty.handler.codec.http.DefaultHttpContent 的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testUpgradeChunk() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
HttpContent content = new DefaultHttpContent(hello);
assertTrue(ch.writeOutbound(content));
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
} finally {
dataFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testEncodeChunkAsClient() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(false));
ByteBuf hello = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
HttpContent content = new DefaultHttpContent(hello);
assertTrue(ch.writeOutbound(content));
Http2DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().toString(CharsetUtil.UTF_8), is("hello world"));
assertFalse(dataFrame.isEndStream());
} finally {
dataFrame.release();
}
assertThat(ch.readOutbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Override
public void hookOnNext(ByteBuf chunk) {
HttpObject msg = new DefaultHttpContent(chunk);
channel.writeAndFlush(msg)
.addListener((ChannelFuture future) -> {
request(1);
if (future.isSuccess()) {
future.channel().read();
} else {
String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress());
LOGGER.error(format("Failed to send request body data. origin=%s connection=%s request=%s",
nettyConnection.getOrigin(), channelIdentifier, request), future.cause());
this.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin()));
}
});
}
@Before
public void setUp() throws Exception {
executeFuture = new CompletableFuture<>();
fullHttpResponse = mock(DefaultHttpContent.class);
when(fullHttpResponse.content()).thenReturn(new EmptyByteBuf(ByteBufAllocator.DEFAULT));
requestContext = new RequestContext(channelPool,
eventLoopGroup,
AsyncExecuteRequest.builder().responseHandler(responseHandler).build(),
null);
channel = new MockChannel();
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
channel.attr(REQUEST_CONTEXT_KEY).set(requestContext);
channel.attr(EXECUTE_FUTURE_KEY).set(executeFuture);
when(ctx.channel()).thenReturn(channel);
nettyResponseHandler = ResponseHandler.getInstance();
DefaultHttpResponse defaultFullHttpResponse = mock(DefaultHttpResponse.class);
when(defaultFullHttpResponse.headers()).thenReturn(EmptyHttpHeaders.INSTANCE);
when(defaultFullHttpResponse.status()).thenReturn(HttpResponseStatus.CREATED);
when(defaultFullHttpResponse.protocolVersion()).thenReturn(HttpVersion.HTTP_1_1);
nettyResponseHandler.channelRead0(ctx, defaultFullHttpResponse);
}
private List<HttpObject> handleChunkedResponse(int desiredResponseStatusCode, boolean responseShouldBeEmpty) {
HttpResponse firstChunk = new DefaultHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.valueOf(desiredResponseStatusCode)
);
firstChunk.headers()
.set(TRANSFER_ENCODING, CHUNKED)
.set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
.set(SOME_EXPECTED_RESPONSE_HEADER.getKey(), SOME_EXPECTED_RESPONSE_HEADER.getValue());
List<HttpObject> responseChunks = new ArrayList<>();
responseChunks.add(firstChunk);
if (!responseShouldBeEmpty) {
RESPONSE_PAYLOAD_CHUNKS.forEach(chunkData -> responseChunks.add(
new DefaultHttpContent(Unpooled.wrappedBuffer(chunkData.getBytes(CharsetUtil.UTF_8)))
));
}
responseChunks.add(LastHttpContent.EMPTY_LAST_CONTENT);
return responseChunks;
}
@Test
public void addContentChunk_adds_chunk_content_length_to_rawContentLengthInBytes() throws IOException {
// given
RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests();
requestInfo.isCompleteRequestWithAllChunks = false;
String chunk1String = UUID.randomUUID().toString();
String lastChunkString = UUID.randomUUID().toString();
byte[] chunk1Bytes = chunk1String.getBytes();
byte[] lastChunkBytes = lastChunkString.getBytes();
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer(chunk1Bytes));
HttpContent lastChunk = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastChunkBytes));
// when
requestInfo.addContentChunk(chunk1);
requestInfo.addContentChunk(lastChunk);
// then
assertThat(requestInfo.contentChunks.size(), is(2));
assertThat(requestInfo.isCompleteRequestWithAllChunks(), is(true));
assertThat(requestInfo.getRawContentLengthInBytes(), is(chunk1Bytes.length + lastChunkBytes.length));
}
@Test
public void testBuildContent()
throws Exception {
HttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "www.google.com");
RecordedHttpRequestBuilder recordedHttpRequestBuilder = new RecordedHttpRequestBuilder(nettyRequest);
String charset = "UTF-8";
String str1 = "first content";
HttpContent httpContent1 = new DefaultHttpContent(Unpooled.copiedBuffer(str1.getBytes(charset)));
recordedHttpRequestBuilder.appendHttpContent(httpContent1);
String str2 = "second content";
HttpContent httpContent2 = new DefaultHttpContent(Unpooled.copiedBuffer(str2.getBytes(charset)));
recordedHttpRequestBuilder.appendHttpContent(httpContent2);
String lastStr = "Last chunk";
HttpContent lastContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastStr.getBytes(charset)));
recordedHttpRequestBuilder.appendHttpContent(lastContent);
RecordedHttpRequest recordedHttpRequest = recordedHttpRequestBuilder.build();
Assert
.assertEquals((str1 + str2 + lastStr).getBytes(charset), recordedHttpRequest.getHttpBody().getContent(charset));
}
@Test
public void testBuild()
throws IOException {
HttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.GATEWAY_TIMEOUT);
RecordedHttpResponseBuilder recordedHttpResponseBuilder = new RecordedHttpResponseBuilder(httpResponse);
String charset = "UTF-8";
String str1 = "Hello world";
HttpContent httpContent1 = new DefaultHttpContent(Unpooled.copiedBuffer(str1.getBytes(charset)));
recordedHttpResponseBuilder.appendHttpContent(httpContent1);
String str2 = "second content";
HttpContent httpContent2 = new DefaultHttpContent(Unpooled.copiedBuffer(str2.getBytes(charset)));
recordedHttpResponseBuilder.appendHttpContent(httpContent2);
String lastStr = "Last chunk";
HttpContent lastContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastStr.getBytes(charset)));
recordedHttpResponseBuilder.appendHttpContent(lastContent);
RecordedHttpResponse recordedHttpResponse = recordedHttpResponseBuilder.build();
Assert.assertEquals(recordedHttpResponse.getStatus(), HttpResponseStatus.GATEWAY_TIMEOUT.code());
Assert.assertEquals((str1 + str2 + lastStr).getBytes(charset),
recordedHttpResponse.getHttpBody().getContent(charset));
}
private ChannelFuture doWriteSplitData(int id, HttpData data, boolean endStream) {
try {
int offset = 0;
int remaining = data.length();
ChannelFuture lastFuture;
for (;;) {
// Ensure an HttpContent does not exceed the maximum length of a cleartext TLS record.
final int chunkSize = Math.min(MAX_TLS_DATA_LENGTH, remaining);
lastFuture = write(id, new DefaultHttpContent(dataChunk(data, offset, chunkSize)), false);
remaining -= chunkSize;
if (remaining == 0) {
break;
}
offset += chunkSize;
}
if (endStream) {
lastFuture = write(id, LastHttpContent.EMPTY_LAST_CONTENT, true);
}
ch.flush();
return lastFuture;
} finally {
ReferenceCountUtil.safeRelease(data);
}
}
@Test
public void shouldDecodeSuccessBucketConfigResponse() throws Exception {
HttpResponse responseHeader = new DefaultHttpResponse(HttpVersion.HTTP_1_1, new HttpResponseStatus(200, "OK"));
HttpContent responseChunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("foo", CHARSET));
HttpContent responseChunk2 = new DefaultLastHttpContent(Unpooled.copiedBuffer("bar", CHARSET));
BucketConfigRequest requestMock = mock(BucketConfigRequest.class);
requestQueue.add(requestMock);
channel.writeInbound(responseHeader, responseChunk1, responseChunk2);
channel.readInbound();
assertEquals(1, eventSink.responseEvents().size());
BucketConfigResponse event = (BucketConfigResponse) eventSink.responseEvents().get(0).getMessage();
assertEquals(ResponseStatus.SUCCESS, event.status());
assertEquals("foobar", event.config());
assertTrue(requestQueue.isEmpty());
}
@Test
public void shouldDecodeListDesignDocumentsResponse() throws Exception {
HttpResponse responseHeader = new DefaultHttpResponse(HttpVersion.HTTP_1_1, new HttpResponseStatus(200, "OK"));
HttpContent responseChunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("foo", CharsetUtil.UTF_8));
HttpContent responseChunk2 = new DefaultLastHttpContent(Unpooled.copiedBuffer("bar", CharsetUtil.UTF_8));
GetDesignDocumentsRequest requestMock = mock(GetDesignDocumentsRequest.class);
requestQueue.add(requestMock);
channel.writeInbound(responseHeader, responseChunk1, responseChunk2);
assertEquals(1, eventSink.responseEvents().size());
GetDesignDocumentsResponse event = (GetDesignDocumentsResponse) eventSink.responseEvents().get(0).getMessage();
assertEquals(ResponseStatus.SUCCESS, event.status());
assertEquals("foobar", event.content());
assertTrue(requestQueue.isEmpty());
}
/**
* Tests that {@link NettyRequest#close()} leaves any added {@link HttpContent} the way it was before it was added.
* (i.e no reference count changes).
* @throws RestServiceException
*/
@Test
public void closeTest() throws RestServiceException {
Channel channel = new MockChannel();
NettyRequest nettyRequest = createNettyRequest(HttpMethod.POST, "/", null, channel);
Queue<HttpContent> httpContents = new LinkedBlockingQueue<HttpContent>();
for (int i = 0; i < 5; i++) {
ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(1024));
HttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(content));
nettyRequest.addContent(httpContent);
httpContents.add(httpContent);
}
closeRequestAndValidate(nettyRequest, channel);
while (httpContents.peek() != null) {
assertEquals("Reference count of http content has changed", 1, httpContents.poll().refCnt());
}
}
/**
* Splits the given {@code contentBytes} into {@code numChunks} chunks and stores them in {@code httpContents}.
* @param contentBytes the content that needs to be split.
* @param numChunks the number of chunks to split {@code contentBytes} into.
* @param httpContents the {@link List<HttpContent>} that will contain all the content in parts.
* @param useCopyForcingByteBuf if {@code true}, uses {@link CopyForcingByteBuf} instead of the default
* {@link ByteBuf}.
*/
private void splitContent(byte[] contentBytes, int numChunks, List<HttpContent> httpContents,
boolean useCopyForcingByteBuf) {
int individualPartSize = contentBytes.length / numChunks;
ByteBuf content;
for (int addedContentCount = 0; addedContentCount < numChunks - 1; addedContentCount++) {
if (useCopyForcingByteBuf) {
content =
CopyForcingByteBuf.wrappedBuffer(contentBytes, addedContentCount * individualPartSize, individualPartSize);
} else {
content = Unpooled.wrappedBuffer(contentBytes, addedContentCount * individualPartSize, individualPartSize);
}
httpContents.add(new DefaultHttpContent(content));
}
if (useCopyForcingByteBuf) {
content =
CopyForcingByteBuf.wrappedBuffer(contentBytes, (numChunks - 1) * individualPartSize, individualPartSize);
} else {
content = Unpooled.wrappedBuffer(contentBytes, (numChunks - 1) * individualPartSize, individualPartSize);
}
httpContents.add(new DefaultLastHttpContent(content));
}
/**
* Does the post test by sending the request and content to {@link NettyMessageProcessor} through an
* {@link EmbeddedChannel} and returns the data stored in the {@link InMemoryRouter} as a result of the post.
* @param postRequest the POST request as a {@link HttpRequest}.
* @param contentToSend the content to be sent as a part of the POST.
* @return the data stored in the {@link InMemoryRouter} as a result of the POST.
* @throws InterruptedException
*/
private ByteBuffer doPostTest(HttpRequest postRequest, List<ByteBuffer> contentToSend) throws InterruptedException {
EmbeddedChannel channel = createChannel();
// POST
notificationSystem.reset();
postRequest.headers().set(RestUtils.Headers.AMBRY_CONTENT_TYPE, "application/octet-stream");
HttpUtil.setKeepAlive(postRequest, false);
channel.writeInbound(postRequest);
if (contentToSend != null) {
for (ByteBuffer content : contentToSend) {
channel.writeInbound(new DefaultHttpContent(Unpooled.wrappedBuffer(content)));
}
channel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}
if (!notificationSystem.operationCompleted.await(1000, TimeUnit.MILLISECONDS)) {
fail("Post did not succeed after 1000ms. There is an error or timeout needs to increase");
}
assertNotNull("Blob id operated on cannot be null", notificationSystem.blobIdOperatedOn);
return router.getActiveBlobs().get(notificationSystem.blobIdOperatedOn).getBlob();
}
private void doFlush(boolean allowWaiting) throws IOException {
Channel ch = ctx.channel();
if (ch == null || !ch.isActive()) {
throw new ClosedChannelException();
}
if (allowWaiting) {
blockUntilReady();
}
ctx.writeAndFlush(new DefaultHttpContent(buf.copy()));
buf.clear();
}
/**
* From the current context (currentBuffer and currentData), returns the next HttpChunk (if possible) trying to get
* sizeleft bytes more into the currentBuffer. This is the Multipart version.从当前上下文(currentBuffer和currentData)中,返回下一个HttpChunk(如果可能的话),尝试将更多的sizeleft字节放入currentBuffer中。这是多部分版本。
*
* @param sizeleft
* the number of bytes to try to get from currentData
* @return the next HttpChunk or null if not enough bytes were found
* @throws ErrorDataEncoderException
* if the encoding is in error
*/
private HttpContent encodeNextChunkMultipart(int sizeleft) throws ErrorDataEncoderException {
if (currentData == null) {
return null;
}
ByteBuf buffer;
if (currentData instanceof InternalAttribute) {
buffer = ((InternalAttribute) currentData).toByteBuf();
currentData = null;
} else {
try {
buffer = ((HttpData) currentData).getChunk(sizeleft);
} catch (IOException e) {
throw new ErrorDataEncoderException(e);
}
if (buffer.capacity() == 0) {
// end for current InterfaceHttpData, need more data
currentData = null;
return null;
}
}
if (currentBuffer == null) {
currentBuffer = buffer;
} else {
currentBuffer = wrappedBuffer(currentBuffer, buffer);
}
if (currentBuffer.readableBytes() < HttpPostBodyUtil.chunkSize) {
currentData = null;
return null;
}
buffer = fillByteBuf();
return new DefaultHttpContent(buffer);
}
private HttpContent lastChunk() {
isLastChunk = true;
if (currentBuffer == null) {
isLastChunkSent = true;
// LastChunk with no more data
return LastHttpContent.EMPTY_LAST_CONTENT;
}
// NextChunk as last non empty from buffer
ByteBuf buffer = currentBuffer;
currentBuffer = null;
return new DefaultHttpContent(buffer);
}
private void onDataRead(Http2DataFrame dataFrame, ChannelHandlerContext ctx) throws Http2Exception {
ByteBuf data = dataFrame.content();
data.retain();
if (!dataFrame.isEndStream()) {
ctx.fireChannelRead(new DefaultHttpContent(data));
} else {
ctx.fireChannelRead(new DefaultLastHttpContent(data));
}
}
@Test
public void addContentChunk_and_getRawConent_and_getRawContentBytes_work_as_expected_for_last_chunk() throws IOException {
// given
RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests();
requestInfo.isCompleteRequestWithAllChunks = false;
String chunk1String = UUID.randomUUID().toString();
String lastChunkString = UUID.randomUUID().toString();
byte[] chunk1Bytes = chunk1String.getBytes();
byte[] lastChunkBytes = lastChunkString.getBytes();
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer(chunk1Bytes));
HttpContent lastChunk = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastChunkBytes));
assertThat(chunk1.refCnt(), is(1));
assertThat(lastChunk.refCnt(), is(1));
assertThat(requestInfo.getRawContentBytes(), nullValue());
assertThat(requestInfo.getRawContent(), nullValue());
// when
requestInfo.addContentChunk(chunk1);
requestInfo.addContentChunk(lastChunk);
// then
assertThat(chunk1.refCnt(), is(2));
assertThat(lastChunk.refCnt(), is(2));
assertThat(requestInfo.contentChunks.size(), is(2));
assertThat(requestInfo.isCompleteRequestWithAllChunks(), is(true));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(chunk1Bytes);
baos.write(lastChunkBytes);
assertThat(requestInfo.getRawContentBytes(), is(baos.toByteArray()));
String rawContentString = requestInfo.getRawContent();
assertThat(requestInfo.getRawContent(), is(chunk1String + lastChunkString));
assertThat(requestInfo.getRawContent() == rawContentString, is(true)); // Verify that the raw content string is cached the first time it's loaded and reused for subsequent calls
assertThat(chunk1.refCnt(), is(1));
assertThat(lastChunk.refCnt(), is(1));
}
@Test
public void addContentChunk_does_not_add_chunk_to_contentChunks_list_if_contentChunksWillBeReleasedExternally_is_true() {
// given
RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests();
requestInfo.isCompleteRequestWithAllChunks = false;
requestInfo.contentChunksWillBeReleasedExternally();
HttpContent chunk = new DefaultHttpContent(Unpooled.copiedBuffer(UUID.randomUUID().toString(), CharsetUtil.UTF_8));
// when
requestInfo.addContentChunk(chunk);
// then
Assertions.assertThat(requestInfo.contentChunks).isEmpty();
}
@Test
@DataProvider(value = {
"[email protected]#$%^&*/?.,<>;:'\"{}[]() | [email protected]#$%^&*/?.,<>;:'\"{}[]() | UTF-8",
"[email protected]#$%^&*/?.,<>;:'\"{}[]() | alsosomeconten[email protected]#$%^&*/?.,<>;:'\"{}[]() | UTF-16",
"[email protected]#$%^&*/?.,<>;:'\"{}[]() | [email protected]#$%^&*/?.,<>;:'\"{}[]() | ISO-8859-1"
}, splitBy = "\\|")
public void convertContentChunksToRawString_and_convertContentChunksToRawBytes_works(String chunk1Base, String chunk2Base, String charsetString) throws IOException {
// given
Charset contentCharset = Charset.forName(charsetString);
String chunk1Content = chunk1Base + "-" + UUID.randomUUID().toString();
String chunk2Content = chunk2Base + "-" + UUID.randomUUID().toString();
byte[] chunk1Bytes = chunk1Content.getBytes(contentCharset);
byte[] chunk2Bytes = chunk2Content.getBytes(contentCharset);
ByteBuf chunk1ByteBuf = Unpooled.copiedBuffer(chunk1Bytes);
ByteBuf chunk2ByteBuf = Unpooled.copiedBuffer(chunk2Bytes);
Collection<HttpContent> chunkCollection = Arrays.asList(new DefaultHttpContent(chunk1ByteBuf), new DefaultHttpContent(chunk2ByteBuf));
// when
String resultString = HttpUtils.convertContentChunksToRawString(contentCharset, chunkCollection);
byte[] resultBytes = HttpUtils.convertContentChunksToRawBytes(chunkCollection);
// then
String expectedResultString = chunk1Content + chunk2Content;
assertThat(resultString, is(expectedResultString));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(chunk1Bytes);
baos.write(chunk2Bytes);
assertThat(resultBytes, is(baos.toByteArray()));
}
@Test
public void convertContentChunksToRawString_and_convertContentChunksToRawBytes_works_with_EmptyByteBuf_chunks() throws IOException {
// given
Charset contentCharset = CharsetUtil.UTF_8;
String chunk1Content = UUID.randomUUID().toString();
String chunk2Content = UUID.randomUUID().toString();
byte[] chunk1Bytes = chunk1Content.getBytes(contentCharset);
byte[] chunk2Bytes = chunk2Content.getBytes(contentCharset);
ByteBuf chunk1ByteBuf = Unpooled.copiedBuffer(chunk1Bytes);
ByteBuf chunk2ByteBuf = Unpooled.copiedBuffer(chunk2Bytes);
Collection<HttpContent> chunkCollection = Arrays.asList(
new DefaultHttpContent(chunk1ByteBuf),
new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)),
new DefaultHttpContent(chunk2ByteBuf),
new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT))
);
// when
String resultString = HttpUtils.convertContentChunksToRawString(contentCharset, chunkCollection);
byte[] resultBytes = HttpUtils.convertContentChunksToRawBytes(chunkCollection);
// then
String expectedResultString = chunk1Content + chunk2Content;
assertThat(resultString, is(expectedResultString));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(chunk1Bytes);
baos.write(chunk2Bytes);
assertThat(resultBytes, is(baos.toByteArray()));
}
@Test
public void convertContentChunksToRawBytes_returns_null_if_total_bytes_is_zero() {
// given
Collection<HttpContent> chunkCollection = Arrays.asList(new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)),
new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)));
// when
byte[] resultBytes = HttpUtils.convertContentChunksToRawBytes(chunkCollection);
// then
assertThat(resultBytes, nullValue());
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof ByteBuf) {
super.write(ctx, new DefaultHttpContent((ByteBuf)msg), promise);
}
else {
super.write(ctx, msg, promise);
}
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(new DefaultHttpContent((ByteBuf) msg), promise);
}
else {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
}
@Test
public void testIssue825() throws Exception {
disposableServer =
HttpServer.create()
.port(0)
.handle((req, resp) -> resp.sendString(Mono.just("test")))
.wiretap(true)
.bindNow();
DefaultFullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
CountDownLatch latch = new CountDownLatch(1);
Connection client =
TcpClient.create()
.port(disposableServer.port())
.handle((in, out) -> {
in.withConnection(x -> x.addHandlerFirst(new HttpClientCodec()))
.receiveObject()
.ofType(DefaultHttpContent.class)
.as(ByteBufFlux::fromInbound)
// ReferenceCounted::release is deliberately invoked
// so that .release() in FluxReceive.drainReceiver will fail
.subscribe(ReferenceCounted::release, t -> latch.countDown(), null);
return out.sendObject(Flux.just(request))
.neverComplete();
})
.wiretap(true)
.connectNow();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
client.disposeNow();
}
HttpContent buildContent(SegmentedData data) {
if (data.endOfMessage()) {
LastHttpContent last = new DefaultLastHttpContent(data.content());
if (data.trailingHeaders() != null) {
last.trailingHeaders().add(data.trailingHeaders().http1Headers(true, true));
}
// setChannelRequest(ctx, null);
return last;
} else {
return new DefaultHttpContent(data.content());
}
}
@Override
public @Nullable HttpContent readChunk(ByteBufAllocator allocator) throws Exception {
if (hasSentTerminatingChunk) {
return null;
}
ByteBuf nextChunk = readNextChunk();
if (nextChunk != null) {
return new DefaultHttpContent(nextChunk);
}
// chunked transfer encoding must be terminated by a final chunk of length zero
hasSentTerminatingChunk = true;
return LastHttpContent.EMPTY_LAST_CONTENT;
}
@Override
public HttpContent marshallResponsePart(final Object res,
final HttpResponseStatus status,
final boolean rawStream) throws IOException {
final String content = rawStream ?
mapper.writeValueAsString(res) + "<br/>\n" :
ChunkHeader.ELEMENT_HEADER + mapper.writeValueAsString(res) + "\n";
final ByteBuf buf = Unpooled.copiedBuffer(content, CharsetUtil.UTF_8);
return new DefaultHttpContent(buf);
}
@Override
public HttpContent marshallResponsePart(Object message, HttpResponseStatus status, boolean rawStream) throws IOException {
final String content = "data: " + om.writeValueAsString(message) + "\n\n";
final ByteBuf buf = Unpooled.copiedBuffer(content, CharsetUtil.UTF_8);
final DefaultHttpContent defaultHttpContent = new DefaultHttpContent(buf);
return defaultHttpContent;
}