下面列出了怎么用io.netty.channel.local.LocalAddress的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testAsyncResolutionSuccess() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.resolver(new TestAddressResolverGroup(true));
bootstrapA.handler(dummyHandler);
final ServerBootstrap bootstrapB = new ServerBootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalServerChannel.class);
bootstrapB.childHandler(dummyHandler);
SocketAddress localAddress = bootstrapB.bind(LocalAddress.ANY).sync().channel().localAddress();
// Connect to the server using the asynchronous resolver.
bootstrapA.connect(localAddress).sync();
}
@Test
public void testAsyncResolutionFailure() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.resolver(new TestAddressResolverGroup(false));
bootstrapA.handler(dummyHandler);
final ServerBootstrap bootstrapB = new ServerBootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalServerChannel.class);
bootstrapB.childHandler(dummyHandler);
SocketAddress localAddress = bootstrapB.bind(LocalAddress.ANY).sync().channel().localAddress();
// Connect to the server using the asynchronous resolver.
ChannelFuture connectFuture = bootstrapA.connect(localAddress);
// Should fail with the UnknownHostException.
assertThat(connectFuture.await(10000), is(true));
assertThat(connectFuture.cause(), is(instanceOf(UnknownHostException.class)));
assertThat(connectFuture.channel().isOpen(), is(false));
}
@Test
public void testChannelFactoryFailureNotifiesPromise() throws Exception {
final RuntimeException exception = new RuntimeException("newChannel crash");
final Bootstrap bootstrap = new Bootstrap()
.handler(dummyHandler)
.group(groupA)
.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
throw exception;
}
});
ChannelFuture connectFuture = bootstrap.connect(LocalAddress.ANY);
// Should fail with the RuntimeException.
assertThat(connectFuture.await(10000), is(true));
assertThat(connectFuture.cause(), sameInstance((Throwable) exception));
assertThat(connectFuture.channel(), is(not(nullValue())));
}
@Test
public void shouldInheritClusterOverride() {
BoundCluster cluster =
new BoundCluster(
ClusterSpec.builder().withPeerInfo("protocol_versions", Lists.newArrayList(5)).build(),
0L,
null);
BoundDataCenter dc = new BoundDataCenter(cluster);
BoundNode node =
new BoundNode(
new LocalAddress(UUID.randomUUID().toString()),
NodeSpec.builder().withName("node0").withId(0L).build(),
Collections.emptyMap(),
cluster,
dc,
null,
timer,
null, // channel reference only needed for closing, not useful in context of this test.
false);
assertThat(node.getFrameCodec().getSupportedProtocolVersions()).containsOnly(5);
assertThat(dc.getFrameCodec().getSupportedProtocolVersions()).containsOnly(5);
assertThat(cluster.getFrameCodec().getSupportedProtocolVersions()).containsOnly(5);
}
@Test
public void shouldInheritClusterOverrideFromCassandraVersion() {
BoundCluster cluster =
new BoundCluster(ClusterSpec.builder().withCassandraVersion("2.1.17").build(), 0L, null);
BoundDataCenter dc = new BoundDataCenter(cluster);
BoundNode node =
new BoundNode(
new LocalAddress(UUID.randomUUID().toString()),
NodeSpec.builder().withName("node0").withId(0L).build(),
Collections.emptyMap(),
cluster,
dc,
null,
timer,
null, // channel reference only needed for closing, not useful in context of this test.
false);
assertThat(node.getFrameCodec().getSupportedProtocolVersions()).containsOnly(3);
assertThat(dc.getFrameCodec().getSupportedProtocolVersions()).containsOnly(3);
assertThat(cluster.getFrameCodec().getSupportedProtocolVersions()).containsOnly(3);
}
@Test
public void testShouldUseProtocolVersionOverride() {
BoundCluster cluster = new BoundCluster(ClusterSpec.builder().build(), 0L, null);
BoundDataCenter dc = new BoundDataCenter(cluster);
BoundNode node =
new BoundNode(
new LocalAddress(UUID.randomUUID().toString()),
NodeSpec.builder()
.withName("node0")
.withId(0L)
.withCassandraVersion("2.1.17")
.withPeerInfo("protocol_versions", Lists.newArrayList(4))
.build(),
Collections.emptyMap(),
cluster,
dc,
null,
timer,
null, // channel reference only needed for closing, not useful in context of this test.
false);
assertThat(node.getFrameCodec().getSupportedProtocolVersions()).containsOnly(4);
}
public void testProtocolVersionForCassandraVersion(
String cassandraVersion, Integer... expectedProtocolVersions) {
BoundCluster cluster = new BoundCluster(ClusterSpec.builder().build(), 0L, null);
BoundDataCenter dc = new BoundDataCenter(cluster);
BoundNode node =
new BoundNode(
new LocalAddress(UUID.randomUUID().toString()),
NodeSpec.builder()
.withName("node0")
.withId(0L)
.withCassandraVersion(cassandraVersion)
.build(),
Collections.emptyMap(),
cluster,
dc,
null,
timer,
null, // channel reference only needed for closing, not useful in context of this test.
false);
assertThat(node.getFrameCodec().getSupportedProtocolVersions())
.containsOnly(expectedProtocolVersions);
}
public TestServer(
EventLoopGroup eventLoopGroup,
LocalAddress localAddress,
ImmutableList<? extends ChannelHandler> handlers) {
// Creates ChannelInitializer with handlers specified
ChannelInitializer<LocalChannel> serverInitializer =
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
for (ChannelHandler handler : handlers) {
ch.pipeline().addLast(handler);
}
}
};
// Sets up serverBootstrap with specified initializer, eventLoopGroup, and using
// LocalServerChannel class
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(serverInitializer);
ChannelFuture unusedFuture = serverBootstrap.bind(localAddress).syncUninterruptibly();
}
@Test
public void testFailure_defaultTrustManager_rejectSelfSignedCert() throws Exception {
SelfSignedCaCertificate ssc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress =
new LocalAddress("DEFAULT_TRUST_MANAGER_REJECT_SELF_SIGNED_CERT_" + sslProvider);
nettyRule.setUpServer(localAddress, getServerHandler(false, ssc.key(), ssc.cert()));
SslClientInitializer<LocalChannel> sslClientInitializer =
new SslClientInitializer<>(
sslProvider, hostProvider, portProvider, ImmutableList.of(), null, null);
nettyRule.setUpClient(localAddress, sslClientInitializer);
// The connection is now terminated, both the client side and the server side should get
// exceptions.
nettyRule.assertThatClientRootCause().isInstanceOf(CertPathBuilderException.class);
nettyRule.assertThatServerRootCause().isInstanceOf(SSLException.class);
assertThat(nettyRule.getClientChannel().isActive()).isFalse();
}
/** Sets up a server channel bound to the given local address. */
public void setUpServer(LocalAddress localAddress, ChannelHandler... handlers) {
checkState(echoHandler == null, "Can't call setUpServer twice");
echoHandler = new EchoHandler();
ChannelInitializer<LocalChannel> serverInitializer =
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
// Add the given handler
ch.pipeline().addLast(handlers);
// Add the "echoHandler" last to log the incoming message and send it back
ch.pipeline().addLast(echoHandler);
serverChannel = ch;
}
};
ServerBootstrap sb =
new ServerBootstrap()
.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(serverInitializer);
ChannelFuture unusedFuture = sb.bind(localAddress).syncUninterruptibly();
}
/** Sets up a client channel connecting to the give local address. */
void setUpClient(LocalAddress localAddress, ChannelHandler handler) {
checkState(echoHandler != null, "Must call setUpServer before setUpClient");
checkState(dumpHandler == null, "Can't call setUpClient twice");
dumpHandler = new DumpHandler();
ChannelInitializer<LocalChannel> clientInitializer =
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) throws Exception {
// Add the given handler
ch.pipeline().addLast(handler);
// Add the "dumpHandler" last to log the incoming message
ch.pipeline().addLast(dumpHandler);
}
};
Bootstrap b =
new Bootstrap()
.group(eventLoopGroup)
.channel(LocalChannel.class)
.handler(clientInitializer);
clientChannel = b.connect(localAddress).syncUninterruptibly().channel();
}
@Test
public void testSuccess_trustAnyClientCert() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress = new LocalAddress("TRUST_ANY_CLIENT_CERT_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(true, false, serverSsc.key(), serverSsc.cert()));
SelfSignedCaCertificate clientSsc = SelfSignedCaCertificate.create();
nettyRule.setUpClient(
localAddress, getClientHandler(serverSsc.cert(), clientSsc.key(), clientSsc.cert()));
SSLSession sslSession = setUpSslChannel(nettyRule.getClientChannel(), serverSsc.cert());
nettyRule.assertThatMessagesWork();
// Verify that the SSL session gets the client cert. Note that this SslSession is for the client
// channel, therefore its local certificates are the remote certificates of the SslSession for
// the server channel, and vice versa.
assertThat(sslSession.getLocalCertificates()).asList().containsExactly(clientSsc.cert());
assertThat(sslSession.getPeerCertificates()).asList().containsExactly(serverSsc.cert());
}
@Test
public void testFailure_clientCertExpired() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress = new LocalAddress("CLIENT_CERT_EXPIRED_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(true, true, serverSsc.key(), serverSsc.cert()));
SelfSignedCaCertificate clientSsc =
SelfSignedCaCertificate.create(
"CLIENT",
Date.from(Instant.now().minus(Duration.ofDays(2))),
Date.from(Instant.now().minus(Duration.ofDays(1))));
nettyRule.setUpClient(
localAddress, getClientHandler(serverSsc.cert(), clientSsc.key(), clientSsc.cert()));
verifySslExcpetion(
nettyRule.getServerChannel(),
channel -> channel.attr(CLIENT_CERTIFICATE_PROMISE_KEY).get().get(),
CertificateExpiredException.class);
}
@Test
public void testFailure_clientCertNotYetValid() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress = new LocalAddress("CLIENT_CERT_EXPIRED_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(true, true, serverSsc.key(), serverSsc.cert()));
SelfSignedCaCertificate clientSsc =
SelfSignedCaCertificate.create(
"CLIENT",
Date.from(Instant.now().plus(Duration.ofDays(1))),
Date.from(Instant.now().plus(Duration.ofDays(2))));
nettyRule.setUpClient(
localAddress, getClientHandler(serverSsc.cert(), clientSsc.key(), clientSsc.cert()));
verifySslExcpetion(
nettyRule.getServerChannel(),
channel -> channel.attr(CLIENT_CERTIFICATE_PROMISE_KEY).get().get(),
CertificateNotYetValidException.class);
}
@Test
public void testSuccess_doesNotRequireClientCert() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress = new LocalAddress("DOES_NOT_REQUIRE_CLIENT_CERT_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(false, false, serverSsc.key(), serverSsc.cert()));
nettyRule.setUpClient(localAddress, getClientHandler(serverSsc.cert(), null, null));
SSLSession sslSession = setUpSslChannel(nettyRule.getClientChannel(), serverSsc.cert());
nettyRule.assertThatMessagesWork();
// Verify that the SSL session does not contain any client cert. Note that this SslSession is
// for the client channel, therefore its local certificates are the remote certificates of the
// SslSession for the server channel, and vice versa.
assertThat(sslSession.getLocalCertificates()).isNull();
assertThat(sslSession.getPeerCertificates()).asList().containsExactly(serverSsc.cert());
}
@Test
public void testFailure_requireClientCertificate() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create(SSL_HOST);
LocalAddress localAddress = new LocalAddress("REQUIRE_CLIENT_CERT_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(true, false, serverSsc.key(), serverSsc.cert()));
nettyRule.setUpClient(
localAddress,
getClientHandler(
serverSsc.cert(),
// No client cert/private key used.
null,
null));
// When the server rejects the client during handshake due to lack of client certificate, both
// should throw exceptions.
nettyRule.assertThatServerRootCause().isInstanceOf(SSLHandshakeException.class);
nettyRule.assertThatClientRootCause().isInstanceOf(SSLException.class);
assertThat(nettyRule.getClientChannel().isActive()).isFalse();
}
@Test
public void testFailure_wrongHostnameInCertificate() throws Exception {
SelfSignedCaCertificate serverSsc = SelfSignedCaCertificate.create("wrong.com");
LocalAddress localAddress = new LocalAddress("WRONG_HOSTNAME_" + sslProvider);
nettyRule.setUpServer(
localAddress, getServerHandler(true, false, serverSsc.key(), serverSsc.cert()));
SelfSignedCaCertificate clientSsc = SelfSignedCaCertificate.create();
nettyRule.setUpClient(
localAddress, getClientHandler(serverSsc.cert(), clientSsc.key(), clientSsc.cert()));
// When the client rejects the server cert due to wrong hostname, both the server and the client
// throw exceptions.
nettyRule.assertThatClientRootCause().isInstanceOf(CertificateException.class);
nettyRule.assertThatClientRootCause().hasMessageThat().contains(SSL_HOST);
nettyRule.assertThatServerRootCause().isInstanceOf(SSLException.class);
assertThat(nettyRule.getClientChannel().isActive()).isFalse();
}
@Test
public void connectionFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
// Write before connect. In the event connect fails, the pipeline is torn down and the handler
// won't be able to fail the writes with the correct exception.
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(new LocalAddress("bogus"));
try {
wf.sync();
fail();
} catch (Exception e) {
assertThat(e).isInstanceOf(ConnectException.class);
assertThat(e).hasMessageThat().contains("connection refused");
}
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class);
}
@Override
protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder
.forAddress(new LocalAddress("in-process-1"))
.negotiationType(NegotiationType.PLAINTEXT)
.channelType(LocalChannel.class)
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
io.grpc.internal.TestingAccessor.setStatsImplementation(
builder, createClientCensusStatsModule());
return builder.build();
}
@Test
public void testChannelInitializerReentrance() {
final AtomicInteger registeredCalled = new AtomicInteger(0);
final ChannelInboundHandlerAdapter handler1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
registeredCalled.incrementAndGet();
}
};
final AtomicInteger initChannelCalled = new AtomicInteger(0);
client.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
initChannelCalled.incrementAndGet();
ch.pipeline().addLast(handler1);
ch.pipeline().fireChannelRegistered();
}
}).localAddress(LocalAddress.ANY);
Channel channel = client.bind().syncUninterruptibly().channel();
try {
// Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the
// pipeline.
channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
// NOOP
}
}).syncUninterruptibly();
assertEquals(1, initChannelCalled.get());
assertEquals(2, registeredCalled.get());
} finally {
channel.close().syncUninterruptibly();
}
}
/**
* Tests that if channel was unhealthy it is not offered back to the pool.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsNotOffered() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
pool.release(channel1).syncUninterruptibly();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//first check that when returned healthy then it actually offered back to the pool.
assertSame(channel1, channel2);
channel1.close().syncUninterruptibly();
pool.release(channel1).syncUninterruptibly();
Channel channel3 = pool.acquire().syncUninterruptibly().getNow();
//channel1 was not healthy anymore so it should not get acquired anymore.
assertNotSame(channel1, channel3);
sc.close().syncUninterruptibly();
channel3.close().syncUninterruptibly();
group.shutdownGracefully();
}
/**
* Tests that if channel was unhealthy it is was offered back to the pool because
* it was requested not to validate channel health on release.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
Future<Void> releaseFuture =
pool.release(channel1, channel1.eventLoop().<Void>newPromise()).syncUninterruptibly();
assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true));
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//verifying that in fact the channel2 is different that means is not pulled from the pool
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
@Test(expected = ConnectException.class)
public void testMap() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, SimpleChannelPool> poolMap =
new AbstractChannelPoolMap<EventLoop, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(EventLoop key) {
return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
SimpleChannelPool pool = poolMap.get(loop);
assertEquals(1, poolMap.size());
assertTrue(poolMap.iterator().hasNext());
assertSame(pool, poolMap.get(loop));
assertTrue(poolMap.remove(loop));
assertFalse(poolMap.remove(loop));
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
pool.acquire().syncUninterruptibly();
}
/**
* Tests that the acquiredChannelCount is not added up several times for the same channel acquire request.
* @throws Exception
*/
@Test
public void testAcquireNewConnectionWhen() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
pool.release(channel1);
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
@Test
public void testCancelBind() throws Exception {
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
ChannelFuture future = pipeline.bind(new LocalAddress("test"), promise);
assertTrue(future.isCancelled());
}
@Test
public void testCancelConnect() throws Exception {
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel());
ChannelPromise promise = pipeline.channel().newPromise();
assertTrue(promise.cancel(false));
ChannelFuture future = pipeline.connect(new LocalAddress("test"), promise);
assertTrue(future.isCancelled());
}
@Test
public void testCloseInFlush() throws Exception {
LocalAddress addr = new LocalAddress("testCloseInFlush");
ServerBootstrap sb = getLocalServerBootstrap();
sb.bind(addr).sync().channel();
Bootstrap cb = getLocalClientBootstrap();
setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION);
Channel clientChannel = cb.connect(addr).sync().channel();
clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
ctx.channel().close();
}
});
super.write(ctx, msg, promise);
ctx.channel().flush();
}
});
clientChannel.write(createTestBuf(2000)).sync();
clientChannel.closeFuture().sync();
assertLog("WRITE\nFLUSH\nCLOSE\n");
}
@Test(timeout = 10000)
public void testBindDeadLock() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.handler(dummyHandler);
final Bootstrap bootstrapB = new Bootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalChannel.class);
bootstrapB.handler(dummyHandler);
List<Future<?>> bindFutures = new ArrayList<Future<?>>();
// Try to bind from each other.
for (int i = 0; i < 1024; i ++) {
bindFutures.add(groupA.next().submit(new Runnable() {
@Override
public void run() {
bootstrapB.bind(LocalAddress.ANY);
}
}));
bindFutures.add(groupB.next().submit(new Runnable() {
@Override
public void run() {
bootstrapA.bind(LocalAddress.ANY);
}
}));
}
for (Future<?> f: bindFutures) {
f.sync();
}
}
@Test(timeout = 10000)
public void testConnectDeadLock() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.handler(dummyHandler);
final Bootstrap bootstrapB = new Bootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalChannel.class);
bootstrapB.handler(dummyHandler);
List<Future<?>> bindFutures = new ArrayList<Future<?>>();
// Try to connect from each other.
for (int i = 0; i < 1024; i ++) {
bindFutures.add(groupA.next().submit(new Runnable() {
@Override
public void run() {
bootstrapB.connect(LocalAddress.ANY);
}
}));
bindFutures.add(groupB.next().submit(new Runnable() {
@Override
public void run() {
bootstrapA.connect(LocalAddress.ANY);
}
}));
}
for (Future<?> f: bindFutures) {
f.sync();
}
}