下面列出了io.netty.channel.embedded.EmbeddedChannel#readInbound ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testIncompleteLinesStrippedDelimiters() {
EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("Test", Charset.defaultCharset()));
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.copiedBuffer("Line\r\ng\r\n", Charset.defaultCharset()));
ByteBuf buf = ch.readInbound();
assertEquals("TestLine", buf.toString(Charset.defaultCharset()));
ByteBuf buf2 = ch.readInbound();
assertEquals("g", buf2.toString(Charset.defaultCharset()));
assertNull(ch.readInbound());
ch.finish();
buf.release();
buf2.release();
}
private static void testSocksCmdResponseDecoderWithDifferentParams(
SocksCmdStatus cmdStatus, SocksAddressType addressType, String host, int port) {
logger.debug("Testing cmdStatus: " + cmdStatus + " addressType: " + addressType);
SocksResponse msg = new SocksCmdResponse(cmdStatus, addressType, host, port);
SocksCmdResponseDecoder decoder = new SocksCmdResponseDecoder();
EmbeddedChannel embedder = new EmbeddedChannel(decoder);
SocksCommonTestUtils.writeMessageIntoEmbedder(embedder, msg);
if (addressType == SocksAddressType.UNKNOWN) {
assertTrue(embedder.readInbound() instanceof UnknownSocksResponse);
} else {
msg = embedder.readInbound();
assertEquals(((SocksCmdResponse) msg).cmdStatus(), cmdStatus);
if (host != null) {
assertEquals(((SocksCmdResponse) msg).host(), host);
}
assertEquals(((SocksCmdResponse) msg).port(), port);
}
assertNull(embedder.readInbound());
}
@Test
public void testDowngradeHeadersWithContentLength() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
Http2Headers headers = new DefaultHttp2Headers();
headers.path("/");
headers.method("GET");
headers.setInt("content-length", 0);
assertTrue(ch.writeInbound(new DefaultHttp2HeadersFrame(headers)));
HttpRequest request = ch.readInbound();
assertThat(request.uri(), is("/"));
assertThat(request.method(), is(HttpMethod.GET));
assertThat(request.protocolVersion(), is(HttpVersion.HTTP_1_1));
assertFalse(request instanceof FullHttpRequest);
assertFalse(HttpUtil.isTransferEncodingChunked(request));
assertThat(ch.readInbound(), is(nullValue()));
assertFalse(ch.finish());
}
@Test
public void testSimpleUnmarshalling() throws IOException {
MarshallerFactory marshallerFactory = createMarshallerFactory();
MarshallingConfiguration configuration = createMarshallingConfig();
EmbeddedChannel ch = new EmbeddedChannel(createDecoder(Integer.MAX_VALUE));
ByteArrayOutputStream bout = new ByteArrayOutputStream();
Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
marshaller.start(Marshalling.createByteOutput(bout));
marshaller.writeObject(testObject);
marshaller.finish();
marshaller.close();
byte[] testBytes = bout.toByteArray();
ch.writeInbound(input(testBytes));
assertTrue(ch.finish());
String unmarshalled = ch.readInbound();
assertEquals(testObject, unmarshalled);
assertNull(ch.readInbound());
}
@Test
public void shouldRetainCurrentMessageWhenSendingItOut() {
channel = new EmbeddedChannel(
new BinaryMemcacheRequestEncoder(),
new BinaryMemcacheRequestDecoder());
ByteBuf key = Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8);
ByteBuf extras = Unpooled.copiedBuffer("extras", CharsetUtil.UTF_8);
BinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(key, extras);
assertTrue(channel.writeOutbound(request));
for (;;) {
ByteBuf buffer = channel.readOutbound();
if (buffer == null) {
break;
}
channel.writeInbound(buffer);
}
BinaryMemcacheRequest read = channel.readInbound();
read.release();
// tearDown will call "channel.finish()"
}
@Test
public void testDecodeTwoLineResponseChunked() {
EmbeddedChannel channel = newChannel();
assertFalse(channel.writeInbound(newBuffer("200-")));
assertFalse(channel.writeInbound(newBuffer("Hello\r\n2")));
assertFalse(channel.writeInbound(newBuffer("00 Ok")));
assertTrue(channel.writeInbound(newBuffer("\r\n")));
assertTrue(channel.finish());
SmtpResponse response = channel.readInbound();
assertEquals(200, response.code());
List<CharSequence> sequences = response.details();
assertEquals(2, sequences.size());
assertEquals("Hello", sequences.get(0).toString());
assertEquals("Ok", sequences.get(1).toString());
assertNull(channel.readInbound());
}
/**
* Tests decoding a single, simple frame.
*/
@Test
public void testDecodeFrame() {
EmbeddedChannel ec = new EmbeddedChannel(new StompFrameDecoder());
final byte[] body = "This is the body.".getBytes(UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(Command.CONNECT.name().getBytes(UTF_8));
buf.writeByte(LINE_FEED_CHAR);
buf.writeBytes("header1:value1".getBytes(UTF_8));
buf.writeByte(LINE_FEED_CHAR);
buf.writeByte(LINE_FEED_CHAR);
buf.writeBytes(body);
buf.writeByte(NULL_CHAR);
ec.writeInbound(buf);
Object actual = ec.readInbound();
assertNotNull(actual);
assertTrue(actual instanceof Frame);
Frame actualFrame = (Frame) actual;
assertEquals(Command.CONNECT, actualFrame.getCommand());
assertEquals(1, actualFrame.getHeaders().getHeaderNames().size());
assertTrue(actualFrame.getHeaders().getHeaderNames().contains("header1"));
assertArrayEquals(body, actualFrame.getBody());
}
private static void testLastResponseWithTrailingHeaderFragmented(byte[] content, int fragmentSize) {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
int headerLength = 47;
// split up the header
for (int a = 0; a < headerLength;) {
int amount = fragmentSize;
if (a + amount > headerLength) {
amount = headerLength - a;
}
// if header is done it should produce a HttpRequest
boolean headerDone = a + amount == headerLength;
assertEquals(headerDone, ch.writeInbound(Unpooled.wrappedBuffer(content, a, amount)));
a += amount;
}
ch.writeInbound(Unpooled.wrappedBuffer(content, headerLength, content.length - headerLength));
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.status(), is(HttpResponseStatus.OK));
LastHttpContent lastContent = ch.readInbound();
assertThat(lastContent.content().isReadable(), is(false));
HttpHeaders headers = lastContent.trailingHeaders();
assertEquals(1, headers.names().size());
List<String> values = headers.getAll(of("Set-Cookie"));
assertEquals(2, values.size());
assertTrue(values.contains("t1=t1v1"));
assertTrue(values.contains("t2=t2v2; Expires=Wed, 09-Jun-2021 10:18:14 GMT"));
lastContent.release();
assertThat(ch.finish(), is(false));
assertThat(ch.readInbound(), is(nullValue()));
}
private static void testResponseWithContentLengthFragmented(byte[] header, int fragmentSize) {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
// split up the header
for (int a = 0; a < header.length;) {
int amount = fragmentSize;
if (a + amount > header.length) {
amount = header.length - a;
}
ch.writeInbound(Unpooled.wrappedBuffer(header, a, amount));
a += amount;
}
byte[] data = new byte[10];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
ch.writeInbound(Unpooled.wrappedBuffer(data, 0, data.length / 2));
ch.writeInbound(Unpooled.wrappedBuffer(data, 5, data.length / 2));
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.status(), is(HttpResponseStatus.OK));
HttpContent firstContent = ch.readInbound();
assertThat(firstContent.content().readableBytes(), is(5));
assertEquals(Unpooled.wrappedBuffer(data, 0, 5), firstContent.content());
firstContent.release();
LastHttpContent lastContent = ch.readInbound();
assertEquals(5, lastContent.content().readableBytes());
assertEquals(Unpooled.wrappedBuffer(data, 5, 5), lastContent.content());
lastContent.release();
assertThat(ch.finish(), is(false));
assertThat(ch.readInbound(), is(nullValue()));
}
@Test
public void testUpgradeRejected() {
HttpClientUpgradeHandler.SourceCodec sourceCodec = new FakeSourceCodec();
HttpClientUpgradeHandler.UpgradeCodec upgradeCodec = new FakeUpgradeCodec();
HttpClientUpgradeHandler handler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 1024);
UserEventCatcher catcher = new UserEventCatcher();
EmbeddedChannel channel = new EmbeddedChannel(catcher);
channel.pipeline().addFirst("upgrade", handler);
assertTrue(
channel.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "netty.io")));
FullHttpRequest request = channel.readOutbound();
assertEquals(2, request.headers().size());
assertTrue(request.headers().contains(HttpHeaderNames.UPGRADE, "fancyhttp", false));
assertTrue(request.headers().contains("connection", "upgrade", false));
assertTrue(request.release());
assertEquals(HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_ISSUED, catcher.getUserEvent());
HttpResponse upgradeResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
upgradeResponse.headers().add(HttpHeaderNames.UPGRADE, "fancyhttp");
assertTrue(channel.writeInbound(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)));
assertTrue(channel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT));
assertEquals(HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED, catcher.getUserEvent());
assertNull(channel.pipeline().get("upgrade"));
HttpResponse response = channel.readInbound();
assertEquals(HttpResponseStatus.OK, response.status());
LastHttpContent last = channel.readInbound();
assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, last);
assertFalse(last.release());
assertFalse(channel.finish());
}
@Test
public void testAggregateWithTrailer() {
HttpObjectAggregator aggr = new HttpObjectAggregator(1024 * 1024);
EmbeddedChannel embedder = new EmbeddedChannel(aggr);
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "http://localhost");
HttpHeaders.setHeader(message, "X-Test", true);
HttpHeaders.setTransferEncodingChunked(message);
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
HttpContent chunk2 = new DefaultHttpContent(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
LastHttpContent trailer = new DefaultLastHttpContent();
trailer.trailingHeaders().set("X-Trailer", true);
assertFalse(embedder.writeInbound(message));
assertFalse(embedder.writeInbound(chunk1));
assertFalse(embedder.writeInbound(chunk2));
// this should trigger a channelRead event so return true
assertTrue(embedder.writeInbound(trailer));
assertTrue(embedder.finish());
FullHttpRequest aggratedMessage = (FullHttpRequest) embedder.readInbound();
assertNotNull(aggratedMessage);
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
HttpHeaders.getContentLength(aggratedMessage));
assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString());
assertEquals(aggratedMessage.trailingHeaders().get("X-Trailer"), Boolean.TRUE.toString());
checkContentBuffer(aggratedMessage);
assertNull(embedder.readInbound());
}
@Test
public void testGarbageHeaders() {
// A response without headers - from https://github.com/netty/netty/issues/2103
byte[] data = ("<html>\r\n" +
"<head><title>400 Bad Request</title></head>\r\n" +
"<body bgcolor=\"white\">\r\n" +
"<center><h1>400 Bad Request</h1></center>\r\n" +
"<hr><center>nginx/1.1.19</center>\r\n" +
"</body>\r\n" +
"</html>\r\n").getBytes();
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.wrappedBuffer(data));
// Garbage input should generate the 999 Unknown response.
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_0));
assertThat(res.status().code(), is(999));
assertThat(res.decoderResult().isFailure(), is(true));
assertThat(res.decoderResult().isFinished(), is(true));
assertThat(ch.readInbound(), is(nullValue()));
// More garbage should not generate anything (i.e. the decoder discards anything beyond this point.)
ch.writeInbound(Unpooled.wrappedBuffer(data));
assertThat(ch.readInbound(), is(nullValue()));
// Closing the connection should not generate anything since the protocol has been violated.
ch.finish();
assertThat(ch.readInbound(), is(nullValue()));
}
/**
* Tests decoding a frame that is too long because of the header length.
*/
@Test
public void testDecodeFrame_LongHeaders() {
int maxFrameSize = 2 * 1024;
String header1 = "header1:";
for (int i = 0; i < maxFrameSize / 2; ++i) {
header1 += 'a';
}
String header2 = "header2:";
for (int i = 0; i < maxFrameSize / 2; ++i) {
header1 += 'b';
}
EmbeddedChannel ec =
new EmbeddedChannel(new StompFrameDecoder(maxFrameSize));
final byte[] body = "This is the body.".getBytes(UTF_8);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(Command.CONNECT.name().getBytes(UTF_8));
buf.writeByte(LINE_FEED_CHAR);
buf.writeBytes(header1.getBytes(UTF_8));
buf.writeByte(LINE_FEED_CHAR);
buf.writeBytes(header2.getBytes(UTF_8));
buf.writeByte(LINE_FEED_CHAR);
buf.writeByte(LINE_FEED_CHAR);
buf.writeBytes(body);
buf.writeByte(NULL_CHAR);
ec.writeInbound(buf);
Object actual = ec.readInbound();
assertNotNull(actual);
assertTrue(actual instanceof Frame);
Frame actualFrame = (Frame) actual;
assertTrue(actualFrame.getHeaders().getHeaderNames().contains(
StompFrameDecoder.HEADER_BAD_REQUEST));
}
@Test
public void testChunkedHeadResponse() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpServerCodec());
// Send the request headers.
assertTrue(ch.writeInbound(Unpooled.copiedBuffer(
"HEAD / HTTP/1.1\r\n\r\n", CharsetUtil.UTF_8)));
HttpRequest request = ch.readInbound();
assertEquals(HttpMethod.HEAD, request.method());
LastHttpContent content = ch.readInbound();
assertFalse(content.content().isReadable());
content.release();
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpUtil.setTransferEncodingChunked(response, true);
assertTrue(ch.writeOutbound(response));
assertTrue(ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT));
assertTrue(ch.finish());
ByteBuf buf = ch.readOutbound();
assertEquals("HTTP/1.1 200 OK\r\ntransfer-encoding: chunked\r\n\r\n", buf.toString(CharsetUtil.US_ASCII));
buf.release();
buf = ch.readOutbound();
assertFalse(buf.isReadable());
buf.release();
assertFalse(ch.finishAndReleaseAll());
}
private void testCompressNone(ZlibWrapper encoderWrapper, ZlibWrapper decoderWrapper) throws Exception {
EmbeddedChannel chEncoder = new EmbeddedChannel(createEncoder(encoderWrapper));
EmbeddedChannel chDecoderZlib = new EmbeddedChannel(createDecoder(decoderWrapper));
try {
// Closing an encoder channel without writing anything should generate both header and footer.
assertTrue(chEncoder.finish());
for (;;) {
ByteBuf deflatedData = chEncoder.readOutbound();
if (deflatedData == null) {
break;
}
chDecoderZlib.writeInbound(deflatedData);
}
// Decoder should not generate anything at all.
boolean decoded = false;
for (;;) {
ByteBuf buf = chDecoderZlib.readInbound();
if (buf == null) {
break;
}
buf.release();
decoded = true;
}
assertFalse("should decode nothing", decoded);
assertFalse(chDecoderZlib.finish());
} finally {
dispose(chEncoder);
dispose(chDecoderZlib);
}
}
@Test
public void testRequestWithBadInitialLine() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.0 with extra\r\n", CharsetUtil.UTF_8));
HttpRequest req = ch.readInbound();
DecoderResult dr = req.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testRequestWithBadHeader() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpRequestDecoder());
ch.writeInbound(Unpooled.copiedBuffer("GET /maybe-something HTTP/1.0\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Good_Name: Good Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("Bad=Name: Bad Value\r\n", CharsetUtil.UTF_8));
ch.writeInbound(Unpooled.copiedBuffer("\r\n", CharsetUtil.UTF_8));
HttpRequest req = ch.readInbound();
DecoderResult dr = req.decoderResult();
assertFalse(dr.isSuccess());
assertTrue(dr.isFailure());
assertEquals("Good Value", req.headers().get(of("Good_Name")));
assertEquals("/maybe-something", req.uri());
ensureInboundTrafficDiscarded(ch);
}
@Test
public void testEmptyHeaderValue() {
EmbeddedChannel channel = new EmbeddedChannel(new HttpRequestDecoder());
String crlf = "\r\n";
String request = "GET /some/path HTTP/1.1" + crlf +
"Host: localhost" + crlf +
"EmptyHeader:" + crlf + crlf;
channel.writeInbound(Unpooled.wrappedBuffer(request.getBytes(CharsetUtil.US_ASCII)));
HttpRequest req = (HttpRequest) channel.readInbound();
assertEquals("", req.headers().get("EmptyHeader"));
}
private static void runUnsupportedExceptHeaderExceptionTest(final boolean close) {
final HttpObjectAggregator aggregator;
final int maxContentLength = 4;
if (close) {
aggregator = new HttpObjectAggregator(maxContentLength, true);
} else {
aggregator = new HttpObjectAggregator(maxContentLength);
}
final EmbeddedChannel embedder = new EmbeddedChannel(new HttpRequestDecoder(), aggregator);
assertFalse(embedder.writeInbound(Unpooled.copiedBuffer(
"GET / HTTP/1.1\r\n" +
"Expect: chocolate=yummy\r\n" +
"Content-Length: 100\r\n\r\n", CharsetUtil.US_ASCII)));
assertNull(embedder.readInbound());
final FullHttpResponse response = embedder.readOutbound();
assertEquals(HttpResponseStatus.EXPECTATION_FAILED, response.status());
assertEquals("0", response.headers().get(HttpHeaderNames.CONTENT_LENGTH));
response.release();
if (close) {
assertFalse(embedder.isOpen());
} else {
// keep-alive is on by default in HTTP/1.1, so the connection should be still alive
assertTrue(embedder.isOpen());
// the decoder should be reset by the aggregator at this point and be able to decode the next request
assertTrue(embedder.writeInbound(Unpooled.copiedBuffer("GET / HTTP/1.1\r\n\r\n", CharsetUtil.US_ASCII)));
final FullHttpRequest request = embedder.readInbound();
assertThat(request.method(), is(HttpMethod.GET));
assertThat(request.uri(), is("/"));
assertThat(request.content().readableBytes(), is(0));
request.release();
}
assertFalse(embedder.finish());
}
@Test
public void testMainSuccess() {
// initialize
when(mainHandshakerMock.newRequestData()).
thenReturn(new WebSocketExtensionData("main", Collections.<String, String>emptyMap()));
when(mainHandshakerMock.handshakeExtension(any(WebSocketExtensionData.class))).thenReturn(mainExtensionMock);
when(fallbackHandshakerMock.newRequestData()).
thenReturn(new WebSocketExtensionData("fallback", Collections.<String, String>emptyMap()));
when(mainExtensionMock.rsv()).thenReturn(WebSocketExtension.RSV1);
when(mainExtensionMock.newExtensionEncoder()).thenReturn(new DummyEncoder());
when(mainExtensionMock.newExtensionDecoder()).thenReturn(new DummyDecoder());
// execute
EmbeddedChannel ch = new EmbeddedChannel(new WebSocketClientExtensionHandler(
mainHandshakerMock, fallbackHandshakerMock));
HttpRequest req = newUpgradeRequest(null);
ch.writeOutbound(req);
HttpRequest req2 = ch.readOutbound();
List<WebSocketExtensionData> reqExts = WebSocketExtensionUtil.extractExtensions(
req2.headers().get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS));
HttpResponse res = newUpgradeResponse("main");
ch.writeInbound(res);
HttpResponse res2 = ch.readInbound();
List<WebSocketExtensionData> resExts = WebSocketExtensionUtil.extractExtensions(
res2.headers().get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS));
// test
assertEquals(2, reqExts.size());
assertEquals("main", reqExts.get(0).name());
assertEquals("fallback", reqExts.get(1).name());
assertEquals(1, resExts.size());
assertEquals("main", resExts.get(0).name());
assertTrue(resExts.get(0).parameters().isEmpty());
assertNotNull(ch.pipeline().get(DummyDecoder.class));
assertNotNull(ch.pipeline().get(DummyEncoder.class) != null);
verify(mainHandshakerMock).newRequestData();
verify(mainHandshakerMock).handshakeExtension(any(WebSocketExtensionData.class));
verify(fallbackHandshakerMock).newRequestData();
verify(mainExtensionMock, atLeastOnce()).rsv();
verify(mainExtensionMock).newExtensionEncoder();
verify(mainExtensionMock).newExtensionDecoder();
}