下面列出了io.netty.channel.embedded.EmbeddedChannel#finishAndReleaseAll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void handleRequest(@Nullable ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
EmbeddedChannel http = new EmbeddedChannel(new HttpRequestDecoder());
try {
http.writeInbound(Unpooled.unreleasableBuffer(packet.content()));
http.finish();
while (true) {
Object result = http.readInbound();
if (result == null) {
break;
}
if (result instanceof HttpRequest) {
HttpRequest req = (HttpRequest)result;
switch (req.getMethod().name()) {
case "NOTIFY": handleUpnpNotify(packet, req); break;
case "M-SEARCH": handleUpnpMsearch(packet, req); break;
default: log.debug("unknown upnp message: {}", req.getMethod()); break;
}
}
}
} finally {
http.finishAndReleaseAll();
}
}
private void handleResponse(@Nullable ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
EmbeddedChannel http = new EmbeddedChannel(new HttpResponseDecoder());
try {
http.writeInbound(Unpooled.unreleasableBuffer(packet.content()));
http.finish();
while (true) {
Object result = http.readInbound();
if (result == null) {
break;
}
if (result instanceof HttpResponse) {
HttpResponse res = (HttpResponse)result;
switch (res.getStatus().code()) {
case 200: handleUpnpMsearchResponse(packet, res); break;
default: log.debug("unknown upnp response: {}", res.getStatus().code()); break;
}
}
}
} finally {
http.finishAndReleaseAll();
}
}
private static void testHandshakeTimeout(boolean client) throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(client);
SslHandler handler = new SslHandler(engine);
handler.setHandshakeTimeoutMillis(1);
EmbeddedChannel ch = new EmbeddedChannel(handler);
try {
while (!handler.handshakeFuture().isDone()) {
Thread.sleep(10);
// We need to run all pending tasks as the handshake timeout is scheduled on the EventLoop.
ch.runPendingTasks();
}
handler.handshakeFuture().syncUninterruptibly();
} finally {
ch.finishAndReleaseAll();
}
}
@Test
public void testNullValuesAddToLength() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel();
Messages.sendDataRow(
channel,
new RowN($(10, null)),
Arrays.asList(PGTypes.get(DataTypes.INTEGER), PGTypes.get(DataTypes.STRING)), null
);
channel.flush();
ByteBuf buffer = channel.readOutbound();
try {
// message type
assertThat((char) buffer.readByte(), is('D'));
// size of the message
assertThat(buffer.readInt(), is(16));
assertThat(buffer.readableBytes(), is(12)); // 16 - INT4 because the size was already read
} finally {
buffer.release();
channel.finishAndReleaseAll();
}
}
@Test
public void testCommandCompleteWithWhitespace() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel();
try {
final String response = "SELECT 42";
Messages.sendCommandComplete(channel, "Select 1", 42);
verifyResponse(channel, response);
Messages.sendCommandComplete(channel, " Select 1", 42);
verifyResponse(channel, response);
Messages.sendCommandComplete(channel, " Select 1 ", 42);
verifyResponse(channel, response);
Messages.sendCommandComplete(channel, "\n Select 1", 42);
verifyResponse(channel, response);
} finally {
channel.finishAndReleaseAll();
}
}
@Test
public void testEmptyReleasedBufferShouldNotWriteEmptyBufferToChannel() throws Exception {
HttpRequestEncoder encoder = new HttpRequestEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
ByteBuf buf = Unpooled.buffer();
buf.release();
try {
channel.writeAndFlush(buf).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause().getCause(), is(instanceOf(IllegalReferenceCountException.class)));
}
channel.finishAndReleaseAll();
}
@Test
public void testEmptyBufferShouldPassThrough() throws Exception {
HttpRequestEncoder encoder = new HttpRequestEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
ByteBuf buffer = Unpooled.buffer();
channel.writeAndFlush(buffer).get();
channel.finishAndReleaseAll();
assertEquals(0, buffer.refCnt());
}
@Test (expected = EncoderException.class)
public void encodeNonFullHttpResponse100ContinueIsRejected() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new Http2StreamFrameToHttpObjectCodec(true));
try {
ch.writeOutbound(new DefaultHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
} finally {
ch.finishAndReleaseAll();
}
}
private static void anyNotIdle(TestableIdleStateHandler idleStateHandler,
Action action, Object expected) throws Exception {
final List<Object> events = new ArrayList<Object>();
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
events.add(evt);
}
};
EmbeddedChannel channel = new EmbeddedChannel(idleStateHandler, handler);
try {
idleStateHandler.tick(1L, TimeUnit.NANOSECONDS);
action.run(channel);
// Advance the ticker by some fraction and run() the task.
// There shouldn't be an IdleStateEvent getting fired because
// we've just performed an action on the channel that is meant
// to reset the idle task.
long delayInNanos = idleStateHandler.delay(TimeUnit.NANOSECONDS);
assertNotEquals(0L, delayInNanos);
idleStateHandler.tickRun(delayInNanos / 2L, TimeUnit.NANOSECONDS);
assertEquals(0, events.size());
// Advance the ticker by the full amount and it should yield
// in an IdleStateEvent.
idleStateHandler.tickRun();
assertEquals(1, events.size());
assertSame(expected, events.get(0));
} finally {
channel.finishAndReleaseAll();
}
}
@Test
public void testNonSslRecord() throws Exception {
SslContext nettyContext = makeSslContext(provider, false);
try {
final AtomicReference<SslHandshakeCompletionEvent> evtRef =
new AtomicReference<SslHandshakeCompletionEvent>();
SniHandler handler = new SniHandler(new DomainNameMappingBuilder<SslContext>(nettyContext).build());
EmbeddedChannel ch = new EmbeddedChannel(handler, new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
assertTrue(evtRef.compareAndSet(null, (SslHandshakeCompletionEvent) evt));
}
}
});
try {
byte[] bytes = new byte[1024];
bytes[0] = SslUtils.SSL_CONTENT_TYPE_ALERT;
try {
ch.writeInbound(Unpooled.wrappedBuffer(bytes));
fail();
} catch (DecoderException e) {
assertTrue(e.getCause() instanceof NotSslRecordException);
}
assertFalse(ch.finish());
} finally {
ch.finishAndReleaseAll();
}
assertTrue(evtRef.get().cause() instanceof NotSslRecordException);
} finally {
releaseAll(nettyContext);
}
}
@Test
public void testTruncatedPacket() throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
// Push the first part of a 5-byte handshake message.
ch.writeInbound(wrappedBuffer(new byte[]{22, 3, 1, 0, 5}));
// Should decode nothing yet.
assertThat(ch.readInbound(), is(nullValue()));
try {
// Push the second part of the 5-byte handshake message.
ch.writeInbound(wrappedBuffer(new byte[]{2, 0, 0, 1, 0}));
fail();
} catch (DecoderException e) {
// Be sure we cleanup the channel and release any pending messages that may have been generated because
// of an alert.
// See https://github.com/netty/netty/issues/6057.
ch.finishAndReleaseAll();
// The pushed message is invalid, so it should raise an exception if it decoded the message correctly.
assertThat(e.getCause(), is(instanceOf(SSLProtocolException.class)));
}
}
@Test(expected = UnsupportedMessageTypeException.class)
public void testNonByteBufNotPassThrough() throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
try {
ch.writeOutbound(new Object());
} finally {
ch.finishAndReleaseAll();
}
}
@Test(expected = DecoderException.class)
public void testNonAsciiServerNameParsing() throws Exception {
SslContext nettyContext = makeSslContext(provider, false);
SslContext leanContext = makeSslContext(provider, false);
SslContext leanContext2 = makeSslContext(provider, false);
try {
DomainNameMapping<SslContext> mapping = new DomainNameMappingBuilder<SslContext>(nettyContext)
.add("*.netty.io", nettyContext)
// input with custom cases
.add("*.LEANCLOUD.CN", leanContext)
// a hostname conflict with previous one, since we are using order-sensitive config,
// the engine won't be used with the handler.
.add("chat4.leancloud.cn", leanContext2)
.build();
SniHandler handler = new SniHandler(mapping);
EmbeddedChannel ch = new EmbeddedChannel(handler);
try {
// hex dump of a client hello packet, which contains an invalid hostname "CHAT4。LEANCLOUD。CN"
String tlsHandshakeMessageHex1 = "16030100";
// part 2
String tlsHandshakeMessageHex = "bd010000b90303a74225676d1814ba57faff3b366" +
"3656ed05ee9dbb2a4dbb1bb1c32d2ea5fc39e0000000100008c0000001700150000164348" +
"415434E380824C45414E434C4F5544E38082434E000b000403000102000a00340032000e0" +
"00d0019000b000c00180009000a0016001700080006000700140015000400050012001300" +
"0100020003000f0010001100230000000d0020001e0601060206030501050205030401040" +
"20403030103020303020102020203000f00010133740000";
// Push the handshake message.
// Decode should fail because of the badly encoded "HostName" string in the SNI extension
// that isn't ASCII as per RFC 6066 - https://tools.ietf.org/html/rfc6066#page-6
ch.writeInbound(Unpooled.wrappedBuffer(StringUtil.decodeHexDump(tlsHandshakeMessageHex1)));
ch.writeInbound(Unpooled.wrappedBuffer(StringUtil.decodeHexDump(tlsHandshakeMessageHex)));
} finally {
ch.finishAndReleaseAll();
}
} finally {
releaseAll(leanContext, leanContext2, nettyContext);
}
}