下面列出了io.netty.channel.embedded.EmbeddedChannel#readOutbound ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
* {@link ByteBuf}.
* @throws IOException if there is an error.
*/
@Test
public void testEncode() throws IOException {
FileRegionEncoder fileRegionEncoder = new FileRegionEncoder();
EmbeddedChannel channel = new EmbeddedChannel(fileRegionEncoder);
File file = File.createTempFile(UUID.randomUUID().toString(), ".data");
file.deleteOnExit();
Random random = new Random(System.currentTimeMillis());
int dataLength = 1 << 10;
byte[] data = new byte[dataLength];
random.nextBytes(data);
write(file, data);
FileRegion fileRegion = new DefaultFileRegion(file, 0, dataLength);
Assert.assertEquals(0, fileRegion.transfered());
Assert.assertEquals(dataLength, fileRegion.count());
Assert.assertTrue(channel.writeOutbound(fileRegion));
ByteBuf out = (ByteBuf) channel.readOutbound();
byte[] arr = new byte[out.readableBytes()];
out.getBytes(0, arr);
Assert.assertArrayEquals("Data should be identical", data, arr);
}
@Before
public void prepare () throws PropertyVetoException, SQLException {
node12.name = "LNPayment12";
node21.name = "LNPayment21";
node23.name = "LNPayment23";
node32.name = "LNPayment32";
node12.pubKeyClient = node2.pubKeyServer;
node21.pubKeyClient = node1.pubKeyServer;
node23.pubKeyClient = node3.pubKeyServer;
node32.pubKeyClient = node2.pubKeyServer;
processor12 = new LNPaymentProcessorImpl(contextFactory1, dbHandler1, node12);
processor21 = new LNPaymentProcessorImpl(contextFactory2, dbHandler2, node21);
processor23 = new LNPaymentProcessorImpl(contextFactory2, dbHandler2, node23);
processor32 = new LNPaymentProcessorImpl(contextFactory3, dbHandler3, node32);
channel12 = new EmbeddedChannel(new ProcessorHandler(processor12, "LNPayment12"));
channel21 = new EmbeddedChannel(new ProcessorHandler(processor21, "LNPayment21"));
channel23 = new EmbeddedChannel(new ProcessorHandler(processor23, "LNPayment23"));
channel32 = new EmbeddedChannel(new ProcessorHandler(processor32, "LNPayment32"));
Message m = (Message) channel21.readOutbound();
assertNull(m);
}
@Test
public void testEncodeMultipleMessages() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel(new GelfTcpFrameDelimiterEncoder());
final byte[] message1 = "Test1".getBytes(StandardCharsets.UTF_8);
final byte[] message2 = "Test2".getBytes(StandardCharsets.UTF_8);
assertTrue(channel.writeOutbound(Unpooled.wrappedBuffer(message1)));
assertTrue(channel.writeOutbound(Unpooled.wrappedBuffer(message2)));
assertTrue(channel.finish());
final ByteBuf outboundBuffer1 = (ByteBuf) channel.readOutbound();
final byte[] bytes1 = outboundBuffer1.array();
assertEquals(bytes1[bytes1.length - 1], (byte) 0);
assertEquals(bytes1.length, message1.length + 1);
final ByteBuf outboundBuffer2 = (ByteBuf) channel.readOutbound();
final byte[] bytes2 = outboundBuffer2.array();
assertEquals(bytes2[bytes2.length - 1], (byte) 0);
assertEquals(bytes2.length, message2.length + 1);
assertNull(channel.readOutbound());
}
@Test
public void selectBucketShouldCompleteConnectFuture() throws Exception {
// Register the Handler
KeyValueSelectBucketHandler handler = new KeyValueSelectBucketHandler("bucket", true);
EmbeddedChannel channel = new EmbeddedChannel(handler);
ChannelFuture connectFuture = channel.connect(new InetSocketAddress("127.0.0.1", 11210));
// Make sure the handler sends the select bucket command the right way
BinaryMemcacheRequest request = (BinaryMemcacheRequest) channel.readOutbound();
assertEquals(OPCODE, request.getOpcode());
assertEquals("bucket".length(), request.getKeyLength());
assertEquals("bucket".length(), request.getTotalBodyLength());
assertEquals("bucket", new String(request.getKey(), CharsetUtil.UTF_8));
// now fake a "not found" response
FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(
new byte[] {}, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER
);
response.setStatus((byte) 0x00); // not found error
channel.writeInbound(response);
assertTrue(connectFuture.isDone());
assertTrue(connectFuture.isSuccess());
assertTrue(channel.pipeline().toMap().isEmpty()); // handler removes itself
}
@Test
public void extraHeadersAreIncluded() throws Exception {
URI uri = new URI("http://does.not.exist:8080/foo");
ImmutableList<Entry<String, String>> remoteHeaders =
ImmutableList.of(
Maps.immutableEntry("key1", "value1"), Maps.immutableEntry("key2", "value2"));
EmbeddedChannel ch =
new EmbeddedChannel(new HttpDownloadHandler(/* credentials= */ null, remoteHeaders));
DownloadCommand cmd =
new DownloadCommand(uri, /* casDownload= */ true, DIGEST, new ByteArrayOutputStream());
ChannelPromise writePromise = ch.newPromise();
ch.writeOneOutbound(cmd, writePromise);
HttpRequest request = ch.readOutbound();
assertThat(request.headers().get("key1")).isEqualTo("value1");
assertThat(request.headers().get("key2")).isEqualTo("value2");
}
@Before
public void prepare () throws PropertyVetoException, SQLException {
Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
node1.isServer = false;
node2.isServer = true;
contextFactory1 = new MockContextFactory(serverObject1);
contextFactory2 = new MockContextFactory(serverObject2);
node1.ephemeralKeyClient = node2.ephemeralKeyServer;
node2.ephemeralKeyClient = node1.ephemeralKeyServer;
node1.ecdhKeySet = ECDH.getSharedSecret(node1.ephemeralKeyServer, node1.ephemeralKeyClient);
node2.ecdhKeySet = ECDH.getSharedSecret(node2.ephemeralKeyServer, node2.ephemeralKeyClient);
channel1 = new EmbeddedChannel(new ProcessorHandler(contextFactory1.getAuthenticationProcessor(node1), "Encryption1"));
channel2 = new EmbeddedChannel(new ProcessorHandler(contextFactory2.getAuthenticationProcessor(node2), "Encryption2"));
Message m = (Message) channel2.readOutbound();
assertNull(m);
}
private static HttpResponse simpleRequest(final CorsConfig config,
final String origin,
final String requestHeaders,
final HttpMethod method) {
final EmbeddedChannel channel = new EmbeddedChannel(new CorsHandler(config), new EchoHandler());
final FullHttpRequest httpRequest = createHttpRequest(method);
if (origin != null) {
httpRequest.headers().set(ORIGIN, origin);
}
if (requestHeaders != null) {
httpRequest.headers().set(ACCESS_CONTROL_REQUEST_HEADERS, requestHeaders);
}
assertThat(channel.writeInbound(httpRequest), is(false));
return (HttpResponse) channel.readOutbound();
}
private static void assertCloseWebSocketFrame(EmbeddedChannel channel) {
ByteBuf buf = (ByteBuf) channel.readOutbound();
Assert.assertEquals(2, buf.readableBytes());
Assert.assertEquals((byte) 0xFF, buf.readByte());
Assert.assertEquals((byte) 0x00, buf.readByte());
buf.release();
}
private static ByteBuf readAll(EmbeddedChannel channel) {
ByteBuf buf = Unpooled.buffer();
ByteBuf read;
while ((read = channel.readOutbound()) != null) {
buf.writeBytes(read);
read.release();
}
return buf;
}
@Test
public void mapsUnrecoverableInternalErrorsToInternalServerError500ResponseCode() {
HttpHandler handler = (request, context) -> {
throw new RuntimeException("Forced exception for testing");
};
EmbeddedChannel channel = buildEmbeddedChannel(handlerWithMocks(handler));
channel.writeInbound(httpRequestAsBuf(GET, "http://foo.com/"));
DefaultHttpResponse response = (DefaultHttpResponse) channel.readOutbound();
assertThat(response.status(), is(io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR));
verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), any(LiveHttpRequest.class));
verify(errorListener, only()).proxyErrorOccurred(any(LiveHttpRequest.class), any(InetSocketAddress.class), eq(INTERNAL_SERVER_ERROR), any(RuntimeException.class));
}
@Test
public void testCompressedFrame() {
EmbeddedChannel encoderChannel = new EmbeddedChannel(new PerFrameDeflateEncoder(9, 15, false));
EmbeddedChannel decoderChannel = new EmbeddedChannel(
ZlibCodecFactory.newZlibDecoder(ZlibWrapper.NONE));
// initialize
byte[] payload = new byte[300];
random.nextBytes(payload);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(true,
WebSocketExtension.RSV3, Unpooled.wrappedBuffer(payload));
// execute
encoderChannel.writeOutbound(frame);
BinaryWebSocketFrame compressedFrame = encoderChannel.readOutbound();
// test
assertNotNull(compressedFrame);
assertNotNull(compressedFrame.content());
assertTrue(compressedFrame instanceof BinaryWebSocketFrame);
assertEquals(WebSocketExtension.RSV1 | WebSocketExtension.RSV3, compressedFrame.rsv());
decoderChannel.writeInbound(compressedFrame.content());
decoderChannel.writeInbound(DeflateDecoder.FRAME_TAIL);
ByteBuf uncompressedPayload = decoderChannel.readInbound();
assertEquals(300, uncompressedPayload.readableBytes());
byte[] finalPayload = new byte[300];
uncompressedPayload.readBytes(finalPayload);
assertTrue(Arrays.equals(finalPayload, payload));
uncompressedPayload.release();
}
@Test
public void mapsWrappedBadRequestExceptionToBadRequest400ResponseCode() {
EmbeddedChannel channel = buildEmbeddedChannel(handlerWithMocks());
String badUri = "/no5_such3_file7.pl?\"><script>alert(73541);</script>56519<script>alert(1)</script>0e134";
channel.writeInbound(httpMessageToBytes(httpRequest(GET, badUri)));
DefaultHttpResponse response = (DefaultHttpResponse) channel.readOutbound();
assertThat(response.getStatus(), is(io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST));
verify(responseEnhancer).enhance(nullable(LiveHttpResponse.Transformer.class), eq(null));
verify(errorListener, only()).proxyErrorOccurred(eq(BAD_REQUEST), nullable(DecoderException.class));
}
private static void assertEncodedResponse(EmbeddedChannel ch) {
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(HttpResponse.class)));
HttpResponse res = (HttpResponse) o;
assertThat(res, is(not(instanceOf(HttpContent.class))));
assertThat(res.headers().get(Names.TRANSFER_ENCODING), is("chunked"));
assertThat(res.headers().get(Names.CONTENT_LENGTH), is(nullValue()));
assertThat(res.headers().get(Names.CONTENT_ENCODING), is("gzip"));
}
@Test
public void testChunkedContentWithTrailingHeader() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new TestEncoder());
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
res.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
ch.writeOutbound(res);
assertEncodedResponse(ch);
ch.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[3])));
ch.writeOutbound(new DefaultHttpContent(Unpooled.wrappedBuffer(new byte[2])));
LastHttpContent content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(new byte[1]));
content.trailingHeaders().set("X-Test", "Netty");
ch.writeOutbound(content);
HttpContent chunk;
chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
chunk.release();
chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
chunk.release();
chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
assertThat(chunk, is(instanceOf(HttpContent.class)));
chunk.release();
chunk = (HttpContent) ch.readOutbound();
assertThat(chunk.content().isReadable(), is(false));
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
assertEquals("Netty", ((LastHttpContent) chunk).trailingHeaders().get("X-Test"));
chunk.release();
assertThat(ch.readOutbound(), is(nullValue()));
}
private void testEmptyContents(boolean chunked, boolean trailers) throws Exception {
HttpRequestEncoder encoder = new HttpRequestEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
if (chunked) {
HttpUtil.setTransferEncodingChunked(request, true);
}
assertTrue(channel.writeOutbound(request));
ByteBuf contentBuffer = Unpooled.buffer();
assertTrue(channel.writeOutbound(new DefaultHttpContent(contentBuffer)));
ByteBuf lastContentBuffer = Unpooled.buffer();
LastHttpContent last = new DefaultLastHttpContent(lastContentBuffer);
if (trailers) {
last.trailingHeaders().set("X-Netty-Test", "true");
}
assertTrue(channel.writeOutbound(last));
// Ensure we only produce ByteBuf instances.
ByteBuf head = channel.readOutbound();
assertTrue(head.release());
ByteBuf content = channel.readOutbound();
content.release();
ByteBuf lastContent = channel.readOutbound();
lastContent.release();
assertFalse(channel.finish());
}
@Test
public void contentLengthNoTrailersHeaderWhiteSpaceEncodedWithValidationOff() {
EmbeddedChannel channel = newEmbeddedChannel();
byte[] content = new byte[128];
ThreadLocalRandom.current().nextBytes(content);
Buffer buffer = allocator.wrap(content);
HttpRequestMetaData request = newRequestMetaData(HTTP_1_1, GET, "/some/path?foo=bar&baz=yyy",
new DefaultHttpHeadersFactory(false, false).newHeaders());
request.headers()
.add(" " + CONNECTION + " ", " " + KEEP_ALIVE)
.add(" " + USER_AGENT + " ", " unit-test ")
.add(CONTENT_LENGTH, valueOf(content.length));
channel.writeOutbound(request);
channel.writeOutbound(buffer.duplicate());
channel.writeOutbound(EmptyHttpHeaders.INSTANCE);
ByteBuf byteBuf = channel.readOutbound();
String actualMetaData = byteBuf.toString(US_ASCII);
byteBuf.release();
assertTrue("unexpected metadata: " + actualMetaData, actualMetaData.contains(
"GET /some/path?foo=bar&baz=yyy HTTP/1.1" + "\r\n"));
assertTrue("unexpected metadata: " + actualMetaData, actualMetaData.contains(
" " + CONNECTION + " : " + KEEP_ALIVE + "\r\n"));
assertTrue("unexpected metadata: " + actualMetaData, actualMetaData.contains(
" " + USER_AGENT + " : unit-test " + "\r\n"));
assertTrue("unexpected metadata: " + actualMetaData, actualMetaData.contains(
CONTENT_LENGTH + ": " + valueOf(buffer.readableBytes()) + "\r\n"));
assertTrue("unexpected metadata: " + actualMetaData, actualMetaData.endsWith("\r\n" + "\r\n"));
byteBuf = channel.readOutbound();
assertEquals(buffer.toNioBuffer(), byteBuf.nioBuffer());
byteBuf.release();
consumeEmptyBufferFromTrailers(channel);
assertFalse(channel.finishAndReleaseAll());
}
@Test
public void testCompatibleExtensionTogetherSuccess() {
// initialize
when(mainHandshakerMock.handshakeExtension(webSocketExtensionDataMatcher("main"))).
thenReturn(mainExtensionMock);
when(mainHandshakerMock.handshakeExtension(webSocketExtensionDataMatcher("fallback"))).
thenReturn(null);
when(fallbackHandshakerMock.handshakeExtension(webSocketExtensionDataMatcher("fallback"))).
thenReturn(fallbackExtensionMock);
when(fallbackHandshakerMock.handshakeExtension(webSocketExtensionDataMatcher("main"))).
thenReturn(null);
when(mainExtensionMock.rsv()).thenReturn(WebSocketExtension.RSV1);
when(mainExtensionMock.newReponseData()).thenReturn(
new WebSocketExtensionData("main", Collections.<String, String>emptyMap()));
when(mainExtensionMock.newExtensionEncoder()).thenReturn(new DummyEncoder());
when(mainExtensionMock.newExtensionDecoder()).thenReturn(new DummyDecoder());
when(fallbackExtensionMock.rsv()).thenReturn(WebSocketExtension.RSV2);
when(fallbackExtensionMock.newReponseData()).thenReturn(
new WebSocketExtensionData("fallback", Collections.<String, String>emptyMap()));
when(fallbackExtensionMock.newExtensionEncoder()).thenReturn(new Dummy2Encoder());
when(fallbackExtensionMock.newExtensionDecoder()).thenReturn(new Dummy2Decoder());
// execute
EmbeddedChannel ch = new EmbeddedChannel(new WebSocketServerExtensionHandler(
mainHandshakerMock, fallbackHandshakerMock));
HttpRequest req = newUpgradeRequest("main, fallback");
ch.writeInbound(req);
HttpResponse res = newUpgradeResponse(null);
ch.writeOutbound(res);
HttpResponse res2 = ch.readOutbound();
List<WebSocketExtensionData> resExts = WebSocketExtensionUtil.extractExtensions(
res2.headers().get(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS));
// test
assertEquals(2, resExts.size());
assertEquals("main", resExts.get(0).name());
assertEquals("fallback", resExts.get(1).name());
assertNotNull(ch.pipeline().get(DummyDecoder.class));
assertNotNull(ch.pipeline().get(DummyEncoder.class));
assertNotNull(ch.pipeline().get(Dummy2Decoder.class));
assertNotNull(ch.pipeline().get(Dummy2Encoder.class));
verify(mainHandshakerMock).handshakeExtension(webSocketExtensionDataMatcher("main"));
verify(mainHandshakerMock).handshakeExtension(webSocketExtensionDataMatcher("fallback"));
verify(fallbackHandshakerMock).handshakeExtension(webSocketExtensionDataMatcher("fallback"));
verify(mainExtensionMock, times(2)).rsv();
verify(mainExtensionMock).newReponseData();
verify(mainExtensionMock).newExtensionEncoder();
verify(mainExtensionMock).newExtensionDecoder();
verify(fallbackExtensionMock, times(2)).rsv();
verify(fallbackExtensionMock).newReponseData();
verify(fallbackExtensionMock).newExtensionEncoder();
verify(fallbackExtensionMock).newExtensionDecoder();
}
@Test
public void testEncode() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new MessageFragmenter()
);
// arg1
byte[] arg1Bytes = new byte[CallFrame.MAX_ARG1_LENGTH];
new Random().nextBytes(arg1Bytes);
ByteBuf arg1 = Unpooled.wrappedBuffer(arg1Bytes);
// arg2
byte[] arg2Bytes = new byte[BUFFER_SIZE];
new Random().nextBytes(arg2Bytes);
ByteBuf arg2 = Unpooled.wrappedBuffer(arg2Bytes);
// arg 3
byte[] arg3Bytes = new byte[BUFFER_SIZE];
new Random().nextBytes(arg3Bytes);
ByteBuf arg3 = Unpooled.wrappedBuffer(arg3Bytes);
RawRequest rawRequest = new RawRequest.Builder("some-service", arg1)
.setArg2(arg2)
.setArg3(arg3)
.setId(0)
.setTimeout(100)
.build();
channel.writeOutbound(rawRequest);
for (int i = 0; i < 4; i++) {
CallFrame req = (CallFrame) MessageCodec.decode(
MessageCodec.decode(
(ByteBuf) channel.readOutbound()
)
);
req.release();
assertNotNull(req);
}
ByteBuf buf = channel.readOutbound();
assertNull(buf);
rawRequest.release();
}
/**
* Tests the common workflow of the {@link NettyResponseChannel} i.e., add some content to response body via
* {@link NettyResponseChannel#write(ByteBuffer, Callback)} and then complete the response.
* <p/>
* These responses have the header Content-Length set.
* @throws Exception
*/
@Test
public void responsesWithContentLengthTest() throws Exception {
EmbeddedChannel channel = createEmbeddedChannel();
MockNettyMessageProcessor processor = channel.pipeline().get(MockNettyMessageProcessor.class);
final int ITERATIONS = 10;
for (int i = 0; i < ITERATIONS; i++) {
boolean isKeepAlive = i != (ITERATIONS - 1);
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.set(MockNettyMessageProcessor.CHUNK_COUNT_HEADER_NAME, i);
HttpRequest httpRequest =
RestTestUtils.createRequest(HttpMethod.POST, TestingUri.ResponseWithContentLength.toString(), httpHeaders);
HttpUtil.setKeepAlive(httpRequest, isKeepAlive);
channel.writeInbound(httpRequest);
verifyCallbacks(processor);
// first outbound has to be response.
HttpResponse response = channel.readOutbound();
assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status());
long contentLength = HttpUtil.getContentLength(response, -1);
assertEquals("Unexpected Content-Length", MockNettyMessageProcessor.CHUNK.length * i, contentLength);
if (contentLength == 0) {
// special case. Since Content-Length is set, the response should be an instance of FullHttpResponse.
assertTrue("Response not instance of FullHttpResponse", response instanceof FullHttpResponse);
} else {
HttpContent httpContent = null;
for (int j = 0; j < i; j++) {
httpContent = channel.readOutbound();
byte[] returnedContent = new byte[httpContent.content().readableBytes()];
httpContent.content().readBytes(returnedContent);
httpContent.release();
assertArrayEquals("Content does not match with expected content", MockNettyMessageProcessor.CHUNK,
returnedContent);
}
// When we know the content-length, the last httpContent would be an instance of LastHttpContent and there is no
// empty last http content following it.
// the last HttpContent should also be an instance of LastHttpContent
assertTrue("The last part of the content is not LastHttpContent", httpContent instanceof LastHttpContent);
}
assertEquals("Unexpected channel state on the server", isKeepAlive, channel.isActive());
}
}
public void sendMessage (EmbeddedChannel from, EmbeddedChannel to) {
Message m = (Message) from.readOutbound();
to.writeInbound(m);
}