下面列出了io.netty.channel.embedded.EmbeddedChannel#writeInbound ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Tests that the underlying network channel is closed when {@link NettyResponseChannel#close()} is called.
*/
@Test
public void closeTest() {
// request is keep-alive by default.
HttpRequest request = createRequestWithHeaders(HttpMethod.GET, TestingUri.Close.toString());
EmbeddedChannel channel = createEmbeddedChannel();
channel.writeInbound(request);
HttpResponse response = (HttpResponse) channel.readOutbound();
assertEquals("Unexpected response status", HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
assertFalse("Inconsistent value for Connection header", HttpUtil.isKeepAlive(response));
// drain the channel of content.
while (channel.readOutbound() != null) {
}
assertFalse("Channel should be closed", channel.isOpen());
}
/**
* Tests if the decoder produces one and only {@link LastHttpContent} when an invalid chunk is received and
* the connection is closed.
*/
@Test
public void testGarbageChunk() {
EmbeddedChannel channel = new EmbeddedChannel(new HttpResponseDecoder());
String responseWithIllegalChunk =
"HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n\r\n" +
"NOT_A_CHUNK_LENGTH\r\n";
channel.writeInbound(Unpooled.copiedBuffer(responseWithIllegalChunk, CharsetUtil.US_ASCII));
assertThat(channel.readInbound(), is(instanceOf(HttpResponse.class)));
// Ensure that the decoder generates the last chunk with correct decoder result.
LastHttpContent invalidChunk = channel.readInbound();
assertThat(invalidChunk.decoderResult().isFailure(), is(true));
invalidChunk.release();
// And no more messages should be produced by the decoder.
assertThat(channel.readInbound(), is(nullValue()));
// .. even after the connection is closed.
assertThat(channel.finish(), is(false));
}
@Test
public void testFullContent() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
ch.writeInbound(newRequest());
FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer("Hello, World", CharsetUtil.US_ASCII));
res.headers().set(Names.CONTENT_LENGTH, res.content().readableBytes());
ch.writeOutbound(res);
assertEncodedResponse(ch);
HttpContent c = (HttpContent) ch.readOutbound();
assertThat(ByteBufUtil.hexDump(c.content()), is("1f8b0800000000000000f248cdc9c9d75108cf2fca4901000000ffff"));
c.release();
c = (HttpContent) ch.readOutbound();
assertThat(ByteBufUtil.hexDump(c.content()), is("0300c6865b260c000000"));
c.release();
LastHttpContent last = (LastHttpContent) ch.readOutbound();
assertThat(last.content().readableBytes(), is(0));
last.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
@Test
public void testWebSocketResponse() {
byte[] data = ("HTTP/1.1 101 WebSocket Protocol Handshake\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Origin: http://localhost:8080\r\n" +
"Sec-WebSocket-Location: ws://localhost/some/path\r\n" +
"\r\n" +
"1234567812345678").getBytes();
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.wrappedBuffer(data));
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.status(), is(HttpResponseStatus.SWITCHING_PROTOCOLS));
HttpContent content = ch.readInbound();
assertThat(content.content().readableBytes(), is(16));
content.release();
assertThat(ch.finish(), is(false));
assertThat(ch.readInbound(), is(nullValue()));
}
@Test
public void testServerWindowSizeSuccess() {
EmbeddedChannel ch = new EmbeddedChannel(new WebSocketServerExtensionHandler(
new PerMessageDeflateServerExtensionHandshaker(6, true, 15, false, false)));
HttpRequest req = newUpgradeRequest(PERMESSAGE_DEFLATE_EXTENSION + "; " + SERVER_MAX_WINDOW + "=10");
ch.writeInbound(req);
HttpResponse res = newUpgradeResponse(null);
ch.writeOutbound(res);
HttpResponse res2 = ch.readOutbound();
List<WebSocketExtensionData> exts = WebSocketExtensionUtil.extractExtensions(
res2.headers().get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS));
Assert.assertEquals(PERMESSAGE_DEFLATE_EXTENSION, exts.get(0).name());
Assert.assertEquals("10", exts.get(0).parameters().get(SERVER_MAX_WINDOW));
Assert.assertNotNull(ch.pipeline().get(PerMessageDeflateDecoder.class));
Assert.assertNotNull(ch.pipeline().get(PerMessageDeflateEncoder.class));
}
@Test
public void testChannelReadPacket() throws Exception {
ByteBuf content = Unpooled.copiedBuffer("d=3:::{\"greetings\":\"Hello World!\"}", CharsetUtil.UTF_8);
HttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/socket.io/1/jsonp-polling", content);
String origin = "http://localhost:8080";
request.headers().add(HttpHeaderNames.ORIGIN, origin);
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, jsonpPollingHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof Packet);
Packet packet = (Packet) object;
assertEquals(origin, packet.getOrigin());
assertEquals("{\"greetings\":\"Hello World!\"}", packet.getData().toString(CharsetUtil.UTF_8));
channel.finish();
}
@Test
public void testLastResponseWithHeaderRemoveTrailingSpaces() {
EmbeddedChannel ch = new EmbeddedChannel(new HttpResponseDecoder());
ch.writeInbound(Unpooled.copiedBuffer(
"HTTP/1.1 200 OK\r\nX-Header: h2=h2v2; Expires=Wed, 09-Jun-2021 10:18:14 GMT \r\n\r\n",
CharsetUtil.US_ASCII));
HttpResponse res = ch.readInbound();
assertThat(res.protocolVersion(), sameInstance(HttpVersion.HTTP_1_1));
assertThat(res.status(), is(HttpResponseStatus.OK));
assertThat(res.headers().get(of("X-Header")), is("h2=h2v2; Expires=Wed, 09-Jun-2021 10:18:14 GMT"));
assertThat(ch.readInbound(), is(nullValue()));
ch.writeInbound(Unpooled.wrappedBuffer(new byte[1024]));
HttpContent content = ch.readInbound();
assertThat(content.content().readableBytes(), is(1024));
content.release();
assertThat(ch.finish(), is(true));
LastHttpContent lastContent = ch.readInbound();
assertThat(lastContent.content().isReadable(), is(false));
lastContent.release();
assertThat(ch.readInbound(), is(nullValue()));
}
@Test
public void testDecode() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(
new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
ByteBuf buf = ch.readInbound();
assertEquals("first", buf.toString(CharsetUtil.US_ASCII));
ByteBuf buf2 = ch.readInbound();
assertEquals("second", buf2.toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
buf.release();
buf2.release();
}
@Test
public void testChannelReadConnect() throws Exception {
HttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/socket.io/1/jsonp-polling");
String origin = "http://localhost:8080";
request.headers().add(HttpHeaderNames.ORIGIN, origin);
LastOutboundHandler lastOutboundHandler = new LastOutboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(lastOutboundHandler, jsonpPollingHandler);
channel.writeInbound(request);
Object object = channel.readInbound();
assertTrue(object instanceof ConnectPacket);
ConnectPacket packet = (ConnectPacket) object;
assertEquals(TransportType.JSONP_POLLING, packet.getTransportType());
assertEquals(origin, packet.getOrigin());
Assert.assertNull(packet.getRemoteAddress());
channel.finish();
}
@Test
public void addNamedDecoderReplaysLastHttp() {
ByteBuf buf = Unpooled.copiedBuffer("{\"foo\":1}", CharsetUtil.UTF_8);
EmbeddedChannel channel = new EmbeddedChannel();
new HttpClientOperations(() -> channel, ConnectionObserver.emptyListener(),
ClientCookieEncoder.STRICT, ClientCookieDecoder.STRICT)
.addHandler("json", new JsonObjectDecoder());
channel.writeInbound(new DefaultLastHttpContent(buf));
MatcherAssert.assertThat(channel.pipeline().names().iterator().next(), is("json$extractor"));
Object content = channel.readInbound();
MatcherAssert.assertThat(content, instanceOf(ByteBuf.class));
((ByteBuf) content).release();
content = channel.readInbound();
MatcherAssert.assertThat(content, instanceOf(LastHttpContent.class));
((LastHttpContent) content).release();
MatcherAssert.assertThat(channel.readInbound(), nullValue());
}
@Test
public void preflightRequestShouldReleaseRequest() {
final CorsConfig config = CorsConfig.withOrigin("http://localhost:8888")
.preflightResponseHeader("CustomHeader", Arrays.asList("value1", "value2"))
.build();
final EmbeddedChannel channel = new EmbeddedChannel(new CorsHandler(config));
final FullHttpRequest request = optionsRequest("http://localhost:8888", "content-type, xheader1");
channel.writeInbound(request);
assertThat(request.refCnt(), is(0));
}
@Override
protected void onTooBigFrame(EmbeddedChannel ch, ByteBuf input) {
try {
ch.writeInbound(input);
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
}
@Test
public void testDecodeWithStrip() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new LineBasedFrameDecoder(8192, true, false));
ch.writeInbound(copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
assertEquals("first", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertEquals("second", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
}
@Test
public void testBodyVariableSerialisation() throws Exception {
initTestClient();
EmbeddedChannel channel = createTestChannel();
channel.writeInbound(new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("{}", ARIEncoder.ENCODING)));
EndpointsSendMessagePutRequest_impl_ari_6_0_0 req = new EndpointsSendMessagePutRequest_impl_ari_6_0_0("to", "from");
req.setHttpClient(client);
req.addVariables("key1", "val1").addVariables("key2", "val2").execute();
validateBody(channel, "variables");
}
@Test
public void testReaderNotIdle() throws Exception {
TestableIdleStateHandler idleStateHandler = new TestableIdleStateHandler(
false, 1L, 0L, 0L, TimeUnit.SECONDS);
Action action = new Action() {
@Override
public void run(EmbeddedChannel channel) throws Exception {
channel.writeInbound("Hello, World!");
}
};
anyNotIdle(idleStateHandler, action, IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT);
}
private void ensureInboundTrafficDiscarded(EmbeddedChannel ch) {
// Generate a lot of random traffic to ensure that it's discarded silently.
byte[] data = new byte[1048576];
rnd.nextBytes(data);
ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < 4096; i ++) {
buf.setIndex(0, data.length);
ch.writeInbound(buf.retain());
ch.checkException();
assertNull(ch.readInbound());
}
buf.release();
}
private void setUp(Http2FrameCodecBuilder frameCodecBuilder, Http2Settings initialRemoteSettings) throws Exception {
/**
* Some tests call this method twice. Once with JUnit's @Before and once directly to pass special settings.
* This call ensures that in case of two consecutive calls to setUp(), the previous channel is shutdown and
* ByteBufs are released correctly.
*/
tearDown();
frameWriter = spy(new VerifiableHttp2FrameWriter());
frameCodec = frameCodecBuilder.frameWriter(frameWriter).frameLogger(new Http2FrameLogger(LogLevel.TRACE))
.initialSettings(initialRemoteSettings).build();
frameListener = ((DefaultHttp2ConnectionDecoder) frameCodec.decoder())
.internalFrameListener();
inboundHandler = new LastInboundHandler();
channel = new EmbeddedChannel();
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(frameCodec);
channel.pipeline().addLast(inboundHandler);
channel.pipeline().fireChannelActive();
http2HandlerCtx = channel.pipeline().context(frameCodec);
// Handshake
verify(frameWriter).writeSettings(eq(http2HandlerCtx),
anyHttp2Settings(), anyChannelPromise());
verifyNoMoreInteractions(frameWriter);
channel.writeInbound(Http2CodecUtil.connectionPrefaceBuf());
frameListener.onSettingsRead(http2HandlerCtx, initialRemoteSettings);
verify(frameWriter).writeSettingsAck(eq(http2HandlerCtx), anyChannelPromise());
frameListener.onSettingsAckRead(http2HandlerCtx);
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
assertNotNull(settingsFrame);
}
@Override
protected void onTooBigFrame(EmbeddedChannel ch, ByteBuf input) {
try {
ch.writeInbound(input);
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
}
@Test
public void testTruncatedPacket() throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
// Push the first part of a 5-byte handshake message.
ch.writeInbound(wrappedBuffer(new byte[]{22, 3, 1, 0, 5}));
// Should decode nothing yet.
assertThat(ch.readInbound(), is(nullValue()));
try {
// Push the second part of the 5-byte handshake message.
ch.writeInbound(wrappedBuffer(new byte[]{2, 0, 0, 1, 0}));
fail();
} catch (DecoderException e) {
// Be sure we cleanup the channel and release any pending messages that may have been generated because
// of an alert.
// See https://github.com/netty/netty/issues/6057.
ch.finishAndReleaseAll();
// The pushed message is invalid, so it should raise an exception if it decoded the message correctly.
assertThat(e.getCause(), is(instanceOf(SSLProtocolException.class)));
}
}
@Override
protected void onTooBigFrame(EmbeddedChannel ch, ByteBuf input) {
try {
ch.writeInbound(input);
fail();
} catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass());
}
}