下面列出了io.netty.channel.embedded.EmbeddedChannel#finish ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testChannelReadPacket() throws Exception {
ByteBuf content = Unpooled.copiedBuffer("d=3:::{\"greetings\":\"Hello World!\"}", CharsetUtil.UTF_8);
HttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/socket.io/1/jsonp-polling", content);
String origin = "http://localhost:8080";
request.headers().add(HttpHeaderNames.ORIGIN, origin);
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, jsonpPollingHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof Packet);
Packet packet = (Packet) object;
assertEquals(origin, packet.getOrigin());
assertEquals("{\"greetings\":\"Hello World!\"}", packet.getData().toString(CharsetUtil.UTF_8));
channel.finish();
}
@Test
public void testMultipleLinesStrippedDelimiters() {
EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("TestLine\r\ng\r\n", Charset.defaultCharset()));
ByteBuf buf = ch.readInbound();
assertEquals("TestLine", buf.toString(Charset.defaultCharset()));
ByteBuf buf2 = ch.readInbound();
assertEquals("g", buf2.toString(Charset.defaultCharset()));
assertNull(ch.readInbound());
ch.finish();
buf.release();
buf2.release();
}
@Test
public void testPrematureClosureWithChunkedEncoding1() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(
Unpooled.copiedBuffer("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n", CharsetUtil.US_ASCII));
// Read the response headers.
HttpResponse res = (HttpResponse) ch.readInbound();
assertThat(res.getProtocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.getStatus(), is(HttpResponseStatus.OK));
assertThat(res.headers().get(Names.TRANSFER_ENCODING), is("chunked"));
assertThat(ch.readInbound(), is(nullValue()));
// Close the connection without sending anything.
ch.finish();
// The decoder should not generate the last chunk because it's closed prematurely.
assertThat(ch.readInbound(), is(nullValue()));
}
@Test
public void testLineProtocol() {
EmbeddedChannel ch = new EmbeddedChannel(new LineDecoder());
// Ordinary input
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'A' }));
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'B' }));
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'C' }));
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { '\n' }));
assertEquals(Unpooled.wrappedBuffer(new byte[] { 'A', 'B', 'C' }), ch.readInbound());
// Truncated input
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 'A' }));
assertNull(ch.readInbound());
ch.finish();
assertNull(ch.readInbound());
}
@Test
public void testDecode() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
ByteBuf buf = ch.readInbound();
assertEquals("first", buf.toString(CharsetUtil.US_ASCII));
ByteBuf buf2 = ch.readInbound();
assertEquals("second", buf2.toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
buf.release();
buf2.release();
}
@Test
public void testChannelReadConnectWithClientIpInHeader() throws Exception {
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/socket.io/1/xhr-polling");
String origin = "http://localhost:8080";
request.headers().add(HttpHeaderNames.ORIGIN, origin);
request.headers().add(X_FORWARDED_FOR, "1.2.3.4");
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, xhrPollingHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof ConnectPacket);
ConnectPacket packet = (ConnectPacket) object;
Assert.assertEquals(TransportType.XHR_POLLING, packet.getTransportType());
Assert.assertEquals(origin, packet.getOrigin());
Assert.assertEquals("/1.2.3.4:0", packet.getRemoteAddress().toString());
channel.finish();
}
@Test
public void testNonFlashPolicy() throws Exception {
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, flashPolicyHandler);
String message = "{ \"friends\": [" +
" {" +
" \"id\": 0," +
" \"name\": \"Vargas Cochran\"" +
" }," +
" {" +
" \"id\": 1," +
" \"name\": \"Gould Marsh\"" +
" }," +
" {" +
" \"id\": 2," +
" \"name\": \"Vaughn Contreras\"" +
" }" +
" ]," +
" \"greeting\": \"Hello, Kinney Warren! You have 5 unread messages.\"," +
" \"favoriteFruit\": \"banana\"}";
channel.writeInbound(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
ByteBuf byteBuf = channel.readInbound();
assertEquals(message, releaseLater(byteBuf).toString(CharsetUtil.UTF_8));
channel.finish();
}
@Test
public void test() {
String writeData = "test";
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new StripDecoder());
ByteBuf request = Unpooled.wrappedBuffer(writeData.getBytes());
embeddedChannel.writeInbound(request);
ByteBuf response = (ByteBuf) embeddedChannel.readOutbound();
assertEquals("a" + writeData + "a", response.toString(Charset.defaultCharset()));
embeddedChannel.finish();
}
/**
* Testcase for https://github.com/netty/netty/issues/433
*/
@Test
public void testUnfinishedChunkedHttpRequestIsLastFlag() throws Exception {
int maxChunkSize = 2000;
HttpServerCodec httpServerCodec = new HttpServerCodec(1000, 1000, maxChunkSize);
EmbeddedChannel decoderEmbedder = new EmbeddedChannel(httpServerCodec);
int totalContentLength = maxChunkSize * 5;
decoderEmbedder.writeInbound(Unpooled.copiedBuffer(
"PUT /test HTTP/1.1\r\n" +
"Content-Length: " + totalContentLength + "\r\n" +
"\r\n", CharsetUtil.UTF_8));
int offeredContentLength = (int) (maxChunkSize * 2.5);
decoderEmbedder.writeInbound(prepareDataChunk(offeredContentLength));
decoderEmbedder.finish();
HttpMessage httpMessage = (HttpMessage) decoderEmbedder.readInbound();
assertNotNull(httpMessage);
boolean empty = true;
int totalBytesPolled = 0;
for (;;) {
HttpContent httpChunk = (HttpContent) decoderEmbedder.readInbound();
if (httpChunk == null) {
break;
}
empty = false;
totalBytesPolled += httpChunk.content().readableBytes();
assertFalse(httpChunk instanceof LastHttpContent);
httpChunk.release();
}
assertFalse(empty);
assertEquals(offeredContentLength, totalBytesPolled);
}
@Test
public void testBadResponse() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder(), new HttpObjectAggregator(1024 * 1024));
ch.writeInbound(Unpooled.copiedBuffer("HTTP/1.0 BAD_CODE Bad Server\r\n", CharsetUtil.UTF_8));
Object inbound = ch.readInbound();
assertThat(inbound, is(instanceOf(FullHttpResponse.class)));
assertTrue(((DecoderResultProvider) inbound).decoderResult().isFailure());
assertNull(ch.readInbound());
ch.finish();
}
@Test
public void testChannelReadWrongPath() throws Exception {
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/wrongPath/");
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, webSocketHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof HttpRequest);
assertEquals(request, object);
channel.finish();
}
private static void safeClose(EmbeddedChannel ch) {
ch.finish();
for (;;) {
ByteBuf m = ch.readOutbound();
if (m == null) {
break;
}
m.release();
}
}
@Test
public void testDecodeWithoutStrip() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new LineBasedFrameDecoder(8192, false, false));
ch.writeInbound(copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
assertEquals("first\r\n", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertEquals("second\n", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
}
@Test
public void testChannelReadNonHttp() throws Exception {
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, xhrPollingHandler);
channel.writeInbound(Unpooled.EMPTY_BUFFER);
Object object = channel.readInbound();
assertTrue(object instanceof ByteBuf);
Assert.assertEquals(Unpooled.EMPTY_BUFFER, object);
channel.finish();
}
@Test
public void testMultipleLinesStrippedDelimiters() {
EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("TestLine\r\ng\r\n", Charset.defaultCharset()));
assertEquals("TestLine", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
assertEquals("g", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
assertNull(ch.readInbound());
ch.finish();
}
@Test(expected = IllegalStateException.class)
public void testFlushViaException() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop;
channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get());
assertNull(channel.readOutbound());
channel.pipeline().fireExceptionCaught(new IllegalStateException());
assertEquals(1, flushCount.get());
assertEquals(1L, channel.readOutbound());
assertNull(channel.readOutbound());
channel.finish();
}
@Test
public void test100Continue() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpServerCodec(), new HttpObjectAggregator(1024));
// Send the request headers.
ch.writeInbound(Unpooled.copiedBuffer(
"PUT /upload-large HTTP/1.1\r\n" +
"Expect: 100-continue\r\n" +
"Content-Length: 1\r\n\r\n", CharsetUtil.UTF_8));
// Ensure the aggregator generates nothing.
assertThat(ch.readInbound(), is(nullValue()));
// Ensure the aggregator writes a 100 Continue response.
ByteBuf continueResponse = (ByteBuf) ch.readOutbound();
assertThat(continueResponse.toString(CharsetUtil.UTF_8), is("HTTP/1.1 100 Continue\r\n\r\n"));
continueResponse.release();
// But nothing more.
assertThat(ch.readOutbound(), is(nullValue()));
// Send the content of the request.
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 42 }));
// Ensure the aggregator generates a full request.
FullHttpRequest req = (FullHttpRequest) ch.readInbound();
assertThat(req.headers().get(CONTENT_LENGTH), is("1"));
assertThat(req.content().readableBytes(), is(1));
assertThat(req.content().readByte(), is((byte) 42));
req.release();
// But nothing more.
assertThat(ch.readInbound(), is(nullValue()));
// Send the actual response.
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED);
res.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
res.headers().set(CONTENT_LENGTH, 2);
ch.writeOutbound(res);
// Ensure the encoder handles the response after handling 100 Continue.
ByteBuf encodedRes = (ByteBuf) ch.readOutbound();
assertThat(encodedRes.toString(CharsetUtil.UTF_8), is("HTTP/1.1 201 Created\r\nContent-Length: 2\r\n\r\nOK"));
encodedRes.release();
ch.finish();
}
@Test
public void test100Continue() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpServerCodec(), new HttpObjectAggregator(1024));
// Send the request headers.
ch.writeInbound(Unpooled.copiedBuffer(
"PUT /upload-large HTTP/1.1\r\n" +
"Expect: 100-continue\r\n" +
"Content-Length: 1\r\n\r\n", CharsetUtil.UTF_8));
// Ensure the aggregator generates nothing.
assertThat(ch.readInbound(), is(nullValue()));
// Ensure the aggregator writes a 100 Continue response.
ByteBuf continueResponse = ch.readOutbound();
assertThat(continueResponse.toString(CharsetUtil.UTF_8), is("HTTP/1.1 100 Continue\r\n\r\n"));
continueResponse.release();
// But nothing more.
assertThat(ch.readOutbound(), is(nullValue()));
// Send the content of the request.
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 42 }));
// Ensure the aggregator generates a full request.
FullHttpRequest req = ch.readInbound();
assertThat(req.headers().get(HttpHeaderNames.CONTENT_LENGTH), is("1"));
assertThat(req.content().readableBytes(), is(1));
assertThat(req.content().readByte(), is((byte) 42));
req.release();
// But nothing more.
assertThat(ch.readInbound(), is(nullValue()));
// Send the actual response.
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED);
res.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
res.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 2);
ch.writeOutbound(res);
// Ensure the encoder handles the response after handling 100 Continue.
ByteBuf encodedRes = ch.readOutbound();
assertThat(encodedRes.toString(CharsetUtil.UTF_8),
is("HTTP/1.1 201 Created\r\n" + HttpHeaderNames.CONTENT_LENGTH + ": 2\r\n\r\nOK"));
encodedRes.release();
ch.finish();
}
@Test
public void testSinglePacket() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(makeNetflowDecoder());
byte[] bytes = get10V5MessagesBytes();
ch.writeInbound(Unpooled.wrappedBuffer(bytes));
List<Record> records = collect10NetflowV5MessagesFromChannel(ch, bytes.length);
ch.finish();
NetflowTestUtil.assertRecordsForTenPackets(records);
}
@Test
public void decode2() throws Exception {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf duplicate = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixLengthDecoder(3));
Assert.assertFalse(channel.writeInbound(buf.readBytes(2)));
Assert.assertTrue(channel.writeInbound(buf.readBytes(7)));
channel.finish();
ByteBuf read = channel.readInbound();
Assert.assertEquals(duplicate.readSlice(3), read);
read = channel.readInbound();
Assert.assertEquals(duplicate.readSlice(3), read);
read = channel.readInbound();
Assert.assertEquals(duplicate.readSlice(3), read);
read.release();
}