下面列出了io.netty.channel.embedded.EmbeddedChannel#writeOutbound ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* If the length of the content is unknown, {@link HttpContentEncoder} should not skip encoding the content
* even if the actual length is turned out to be 0.
*/
@Test
public void testEmptySplitContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
ch.writeOutbound(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
assertEncodedResponse(ch);
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);
HttpContent chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("0"));
assertThat(chunk, is(instanceOf(HttpContent.class)));
chunk.release();
chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().isReadable(), is(false));
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
chunk.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void testServerWindowSizeSuccess() {
EmbeddedChannel ch = new EmbeddedChannel(new WebSocketServerExtensionHandler(
new PerMessageDeflateServerExtensionHandshaker(6, true, 15, false, false)));
HttpRequest req = newUpgradeRequest(PERMESSAGE_DEFLATE_EXTENSION + "; " + SERVER_MAX_WINDOW + "=10");
ch.writeInbound(req);
HttpResponse res = newUpgradeResponse(null);
ch.writeOutbound(res);
HttpResponse res2 = ch.readOutbound();
List<WebSocketExtensionData> exts = WebSocketExtensionUtil.extractExtensions(
res2.headers().get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS));
Assert.assertEquals(PERMESSAGE_DEFLATE_EXTENSION, exts.get(0).name());
Assert.assertEquals("10", exts.get(0).parameters().get(SERVER_MAX_WINDOW));
Assert.assertNotNull(ch.pipeline().get(PerMessageDeflateDecoder.class));
Assert.assertNotNull(ch.pipeline().get(PerMessageDeflateEncoder.class));
}
@Test
public void variableWithTrailers() {
EmbeddedChannel channel = newEmbeddedChannel();
byte[] content = new byte[128];
ThreadLocalRandom.current().nextBytes(content);
Buffer buffer = DEFAULT_ALLOCATOR.wrap(content);
HttpHeaders trailers = DefaultHttpHeadersFactory.INSTANCE.newTrailers();
trailers.add("TrailerStatus", "good");
HttpResponseMetaData response = newResponseMetaData(HTTP_1_1, OK, INSTANCE.newHeaders());
response.headers()
.add(CONNECTION, KEEP_ALIVE)
.add(SERVER, "unit-test");
channel.writeOutbound(response);
channel.writeOutbound(buffer.duplicate());
channel.writeOutbound(trailers);
verifyHttpResponse(channel, buffer, TransferEncoding.Variable, false);
// The trailers will just not be encoded if the transfer encoding is not set correctly.
assertFalse(channel.finishAndReleaseAll());
}
@Test
public void testEmptyFullContentWithTrailer() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(newRequest());
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
res.trailingHeaders().set(of("X-Test"), of("Netty"));
ch.writeOutbound(res);
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertThat(res.headers().get(HttpHeaderNames.TRANSFER_ENCODING), is(nullValue()));
// Content encoding shouldn't be modified.
assertThat(res.headers().get(HttpHeaderNames.CONTENT_ENCODING), is(nullValue()));
assertThat(res.content().readableBytes(), is(0));
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
assertEquals("Netty", res.trailingHeaders().get(of("X-Test")));
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void test_disconnect_after_receiving_to_10_publishes_than_sending_10_pubrecs_with_success_pub() {
final EmbeddedChannel channel = new EmbeddedChannel(TestMqttDecoder.create(), flowControlHandler);
final PUBLISHFactory.Mqtt5Builder builder = new PUBLISHFactory.Mqtt5Builder();
builder.withTopic("topic").withQoS(QoS.EXACTLY_ONCE).withUserProperties(Mqtt5UserProperties.NO_USER_PROPERTIES)
.withPayload(new byte[0]).withHivemqId("hivemqId1");
channel.attr(ChannelAttributes.CLIENT_RECEIVE_MAXIMUM).set(10);
for (int i = 0; i < 10; i++) {
channel.writeInbound(builder.build());
}
for (int i = 0; i < 10; i++) {
channel.writeOutbound(new PUBREC(i, Mqtt5PubRecReasonCode.SUCCESS, null, Mqtt5UserProperties.NO_USER_PROPERTIES));
}
channel.writeInbound(builder.build());
verify(eventLog).clientWasDisconnected(channel, "Sent too many concurrent PUBLISH messages");
assertFalse(channel.isOpen());
assertFalse(channel.isActive());
}
@Test
public void chunkedNoTrailers() {
EmbeddedChannel channel = newEmbeddedChannel();
byte[] content = new byte[128];
ThreadLocalRandom.current().nextBytes(content);
Buffer buffer = DEFAULT_ALLOCATOR.wrap(content);
HttpResponseMetaData response = newResponseMetaData(HTTP_1_1, OK, INSTANCE.newHeaders());
response.headers()
.add(CONNECTION, KEEP_ALIVE)
.add(SERVER, "unit-test")
.add(TRANSFER_ENCODING, CHUNKED);
channel.writeOutbound(response);
channel.writeOutbound(buffer.duplicate());
channel.writeOutbound(EmptyHttpHeaders.INSTANCE);
verifyHttpResponse(channel, buffer, TransferEncoding.Chunked, false);
assertFalse(channel.finishAndReleaseAll());
}
/**
* If the length of the content is 0 for sure, {@link HttpContentEncoder} should skip encoding.
*/
@Test
public void testEmptyFullContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
ch.writeOutbound(res);
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertThat(res.headers().get(HttpHeaderNames.TRANSFER_ENCODING), is(nullValue()));
// Content encoding shouldn't be modified.
assertThat(res.headers().get(HttpHeaderNames.CONTENT_ENCODING), is(nullValue()));
assertThat(res.content().readableBytes(), is(0));
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
res.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void test_disconnected_after_not_authorized() throws Exception {
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Mqtt3ConnackEncoder());
embeddedChannel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv3_1);
embeddedChannel.writeOutbound(new CONNACK(Mqtt3ConnAckReturnCode.REFUSED_NOT_AUTHORIZED));
assertEquals(false, embeddedChannel.isActive());
}
private Batch decodeCompressedBatch(Batch batch) {
EmbeddedChannel channel = new EmbeddedChannel(new CompressedBatchEncoder(), new BeatsParser());
channel.writeOutbound(batch);
Object o = channel.readOutbound();
channel.writeInbound(o);
return (Batch) channel.readInbound();
}
@Test
public void successCountersUpdatedWhenFirstChunkReceived() {
EmbeddedChannel channel = buildEmbeddedChannel();
//
// Send out a HttpRequest in outbound direction, towards origins:
//
HttpRequest request = httpRequest(GET, "http://www.hotels.com/foo/bar/request");
channel.writeOutbound(request);
assertThat(grabSentBytes(channel).isPresent(), is(true));
ByteBuf response = httpResponseAsBuf(OK, STOCK_BODY).retain();
int len = response.writerIndex() - response.readerIndex();
//
// Send the response in two chunks. The success counters are updated immediately when
// the first chunk is received:
//
channel.writeInbound(response.slice(0, 100));
assertThat(this.metricRegistry.meter(name(APP_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(1L));
assertThat(this.metricRegistry.meter(name(ORIGIN_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(1L));
//
// Send the next chunk. Demonstrate that counters remain unchanged. This is to ensure
// they don't get incremented twice:
//
channel.writeInbound(response.slice(100, len - 100));
assertThat(this.metricRegistry.meter(name(APP_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(1L));
assertThat(this.metricRegistry.meter(name(ORIGIN_METRIC_PREFIX, REQUEST_SUCCESS)).getCount(), is(1L));
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerFrameDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload));
// execute
encoderChannel.writeOutbound(frame);
BinaryWebSocketFrame compressedFrame = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame);
assertNotNull(compressedFrame.content());
assertTrue(compressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame.rsv());
decoderChannel.writeInbound(compressedFrame.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload = decoderChannel.readInbound();
assertEquals(300, uncompressedPayload.readableBytes());
byte[] finalPayload = new byte[300];
uncompressedPayload.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedPayload.release();
}
public static List<ByteBuf> encodeResponse(HttpResponse response) {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast("http response encoder", new HttpResponseEncoder());
channel.writeOutbound(response);
return extractBuffers(channel);
}
@Test
public void testAdjustedLengthLessThanZero() throws Exception {
final EmbeddedChannel ch = new EmbeddedChannel(new LengthFieldPrepender(4, -2));
try {
ch.writeOutbound(msg);
fail(EncoderException.class.getSimpleName() + " must be raised.");
} catch (EncoderException e) {
// Expected
}
}
@Test
public void test100Continue() throws Exception {
FullHttpRequest request = newRequest();
HttpUtil.set100ContinueExpected(request, true);
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(request);
FullHttpResponse continueResponse = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
ch.writeOutbound(continueResponse);
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
res.trailingHeaders().set(of("X-Test"), of("Netty"));
ch.writeOutbound(res);
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertSame(continueResponse, res);
res.release();
o = ch.readOutbound();
assertThat(o, is(instanceOf(FullHttpResponse.class)));
res = (FullHttpResponse) o;
assertThat(res.headers().get(HttpHeaderNames.TRANSFER_ENCODING), is(nullValue()));
// Content encoding shouldn't be modified.
assertThat(res.headers().get(HttpHeaderNames.CONTENT_ENCODING), is(nullValue()));
assertThat(res.content().readableBytes(), is(0));
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
assertEquals("Netty", res.trailingHeaders().get(of("X-Test")));
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void testChunkedContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(newRequest());
HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
ch.writeOutbound(res);
assertEncodedResponse(ch);
ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("Hell", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("o, w", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultLastHttpContent(Unpooled.copiedBuffer("orld", CharsetUtil.US_ASCII)));
HttpContent chunk;
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("1f8b0800000000000000f248cdc901000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("cad7512807000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("ca2fca4901000000ffff"));
chunk.release();
chunk = ch.readOutbound();
assertThat(ByteBufUtil.hexDump(chunk.content()), is("0300c2a99ae70c000000"));
assertThat(chunk, is(instanceOf(HttpContent.class)));
chunk.release();
chunk = ch.readOutbound();
assertThat(chunk.content().isReadable(), is(false));
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
chunk.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void testChunkedContentWithTrailingHeader() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
ch.writeOutbound(res);
assertEncodedResponse(ch);
ch.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[3])));
ch.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[2])));
LastHttpContent content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(new byte[1]));
content.trailingHeaders().set(of("X-Test"), of("Netty"));
ch.writeOutbound(content);
HttpContent chunk;
chunk = ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
chunk.release();
chunk = ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
chunk.release();
chunk = ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
assertThat(chunk, is(instanceOf(HttpContent.class)));
chunk.release();
chunk = ch.readOutbound();
assertThat(chunk.content().isReadable(), is(false));
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
assertEquals("Netty", ((LastHttpContent) chunk).trailingHeaders().get(of("X-Test")));
chunk.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void variableNoTrailersNoContent() {
EmbeddedChannel channel = newEmbeddedChannel();
HttpResponseMetaData response = newResponseMetaData(HTTP_1_1, OK, INSTANCE.newHeaders());
response.headers()
.add(CONNECTION, KEEP_ALIVE)
.add(SERVER, "unit-test");
channel.writeOutbound(response);
channel.writeOutbound(EMPTY_BUFFER.duplicate());
channel.writeOutbound(EmptyHttpHeaders.INSTANCE);
verifyHttpResponse(channel, EMPTY_BUFFER, TransferEncoding.Variable, false);
assertFalse(channel.finishAndReleaseAll());
}
public static TFrame encodeDecode(TFrame frame) {
EmbeddedChannel channel = new EmbeddedChannel(
new TChannelLengthFieldBasedFrameDecoder(),
new TFrameCodec()
);
channel.writeOutbound(frame);
channel.writeInbound(channel.readOutbound());
return channel.readInbound();
}
@Test
public void testConnect200Response() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
HttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.CONNECT, "google.com:80");
ch.writeInbound(req);
HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
ch.writeOutbound(res);
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);
assertEmptyResponse(ch);
}
@Test
public void testMultiCompressedPayloadWithinFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibEncoder(ZlibWrapper.NONE, 9, 15, 8));
EmbeddedChannel decoderChannel = new EmbeddedChannel(new PerMessageDeflateDecoder(false));
// initialize
byte[] payload1 = new byte[100];
random.nextBytes(payload1);
byte[] payload2 = new byte[100];
random.nextBytes(payload2);
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload1));
ByteBuf compressedPayload1 = encoderChannel.readOutbound();
encoderChannel.writeOutbound(Unpooled.wrappedBuffer(payload2));
ByteBuf compressedPayload2 = encoderChannel.readOutbound();
BinaryWebSocketFrame compressedFrame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV1 | WebSocketExtension.RSV3,
Unpooled.wrappedBuffer(
compressedPayload1,
compressedPayload2.slice(0, compressedPayload2.readableBytes() - 4)));
// execute
decoderChannel.writeInbound(compressedFrame);
BinaryWebSocketFrame uncompressedFrame = decoderChannel.readInbound();
// test
assertNotNull(uncompressedFrame);
assertNotNull(uncompressedFrame.content());
assertTrue(uncompressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV3, uncompressedFrame.rsv());
assertEquals(200, uncompressedFrame.content().readableBytes());
byte[] finalPayload1 = new byte[100];
uncompressedFrame.content().readBytes(finalPayload1);
assertTrue(Arrays.equals(finalPayload1, payload1));
byte[] finalPayload2 = new byte[100];
uncompressedFrame.content().readBytes(finalPayload2);
assertTrue(Arrays.equals(finalPayload2, payload2));
uncompressedFrame.release();
}