下面列出了怎么用io.netty.channel.local.LocalServerChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testAsyncResolutionSuccess() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.resolver(new TestAddressResolverGroup(true));
bootstrapA.handler(dummyHandler);
final ServerBootstrap bootstrapB = new ServerBootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalServerChannel.class);
bootstrapB.childHandler(dummyHandler);
SocketAddress localAddress = bootstrapB.bind(LocalAddress.ANY).sync().channel().localAddress();
// Connect to the server using the asynchronous resolver.
bootstrapA.connect(localAddress).sync();
}
@Test
public void testAsyncResolutionFailure() throws Exception {
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.resolver(new TestAddressResolverGroup(false));
bootstrapA.handler(dummyHandler);
final ServerBootstrap bootstrapB = new ServerBootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalServerChannel.class);
bootstrapB.childHandler(dummyHandler);
SocketAddress localAddress = bootstrapB.bind(LocalAddress.ANY).sync().channel().localAddress();
// Connect to the server using the asynchronous resolver.
ChannelFuture connectFuture = bootstrapA.connect(localAddress);
// Should fail with the UnknownHostException.
assertThat(connectFuture.await(10000), is(true));
assertThat(connectFuture.cause(), is(instanceOf(UnknownHostException.class)));
assertThat(connectFuture.channel().isOpen(), is(false));
}
public TestServer(
EventLoopGroup eventLoopGroup,
LocalAddress localAddress,
ImmutableList<? extends ChannelHandler> handlers) {
// Creates ChannelInitializer with handlers specified
ChannelInitializer<LocalChannel> serverInitializer =
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
for (ChannelHandler handler : handlers) {
ch.pipeline().addLast(handler);
}
}
};
// Sets up serverBootstrap with specified initializer, eventLoopGroup, and using
// LocalServerChannel class
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(serverInitializer);
ChannelFuture unusedFuture = serverBootstrap.bind(localAddress).syncUninterruptibly();
}
/** Sets up a server channel bound to the given local address. */
public void setUpServer(LocalAddress localAddress, ChannelHandler... handlers) {
checkState(echoHandler == null, "Can't call setUpServer twice");
echoHandler = new EchoHandler();
ChannelInitializer<LocalChannel> serverInitializer =
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
// Add the given handler
ch.pipeline().addLast(handlers);
// Add the "echoHandler" last to log the incoming message and send it back
ch.pipeline().addLast(echoHandler);
serverChannel = ch;
}
};
ServerBootstrap sb =
new ServerBootstrap()
.group(eventLoopGroup)
.channel(LocalServerChannel.class)
.childHandler(serverInitializer);
ChannelFuture unusedFuture = sb.bind(localAddress).syncUninterruptibly();
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class);
}
private static Channel newServer(EventLoopGroup group, SocketAddress address,
SslContext context, byte[] response, ChannelHandler handler) {
ServerBootstrap bootstrap = new ServerBootstrap()
.channel(LocalServerChannel.class)
.group(group)
.childHandler(newServerHandler(context, response, handler));
return bootstrap.bind(address)
.syncUninterruptibly()
.channel();
}
@Before
public void setUp() {
group = new DefaultEventLoopGroup(1);
server = new ServerBootstrap()
.group(group)
.channel(LocalServerChannel.class)
.localAddress(SERVER_ADDRESS);
client = new Bootstrap()
.group(group)
.channel(LocalChannel.class)
.handler(new ChannelInboundHandlerAdapter());
testHandler = new InspectableHandler();
}
/**
* Tests that if channel was unhealthy it is not offered back to the pool.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsNotOffered() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
pool.release(channel1).syncUninterruptibly();
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//first check that when returned healthy then it actually offered back to the pool.
assertSame(channel1, channel2);
channel1.close().syncUninterruptibly();
pool.release(channel1).syncUninterruptibly();
Channel channel3 = pool.acquire().syncUninterruptibly().getNow();
//channel1 was not healthy anymore so it should not get acquired anymore.
assertNotSame(channel1, channel3);
sc.close().syncUninterruptibly();
channel3.close().syncUninterruptibly();
group.shutdownGracefully();
}
/**
* Tests that if channel was unhealthy it is was offered back to the pool because
* it was requested not to validate channel health on release.
*
* @throws Exception
*/
@Test
public void testUnhealthyChannelIsOfferedWhenNoHealthCheckRequested() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new CountingChannelPoolHandler();
ChannelPool pool = new SimpleChannelPool(cb, handler, ChannelHealthChecker.ACTIVE, false);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
Future<Void> releaseFuture =
pool.release(channel1, channel1.eventLoop().<Void>newPromise()).syncUninterruptibly();
assertThat(releaseFuture.isSuccess(), CoreMatchers.is(true));
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
//verifying that in fact the channel2 is different that means is not pulled from the pool
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
/**
* Tests that the acquiredChannelCount is not added up several times for the same channel acquire request.
* @throws Exception
*/
@Test
public void testAcquireNewConnectionWhen() throws Exception {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
pool.release(channel1);
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
ServerBootstrap getLocalServerBootstrap() {
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
}
});
return sb;
}
@Test(timeout = 5000)
public void testHandlerRegister() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
LocalEventLoopGroup group = new LocalEventLoopGroup(1);
try {
ServerBootstrap sb = new ServerBootstrap();
sb.channel(LocalServerChannel.class)
.group(group)
.childHandler(new ChannelInboundHandlerAdapter())
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
try {
assertTrue(ctx.executor().inEventLoop());
} catch (Throwable cause) {
error.set(cause);
} finally {
latch.countDown();
}
}
});
sb.register().syncUninterruptibly();
latch.await();
assertNull(error.get());
} finally {
group.shutdownGracefully();
}
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should have since a custom one was not provided.
try {
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
fail("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// expected
}
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopAndTimerIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
Timer timer = new HashedWheelTimer();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withTimer(timer)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should not have since a custom one was not provided.
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
timer.stop();
}
ServerBootstrap getLocalServerBootstrap() {
EventLoopGroup serverGroup = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
}
});
return sb;
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup);
}
@Test
public void channelInactiveFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.pipeline().fireChannelInactive();
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(status.getDescription())
.contains("Connection closed while performing protocol negotiation");
}
}
@Test
public void channelCloseFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.close();
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(status.getDescription())
.contains("Connection closing while performing protocol negotiation");
}
}
@Test
public void uncaughtExceptionFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.pipeline().fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.ABORTED);
assertThat(status.getDescription()).contains("zap");
}
}
@Test
public void uncaughtException_closeAtMostOnce() throws Exception {
final AtomicInteger closes = new AtomicInteger();
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelDuplexHandler() {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
closes.getAndIncrement();
// Simulates a loop between this handler and the WriteBufferingAndExceptionHandler.
ctx.fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
super.close(ctx, promise);
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
chan.connect(addr).sync();
chan.close().sync();
assertEquals(1, closes.get());
}
@Test
public void handlerRemovedFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
ctx.pipeline().remove(
ctx.pipeline().context(WriteBufferingAndExceptionHandler.class).name());
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
chan.connect(addr);
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.pipeline().removeFirst();
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(status.getDescription()).contains("Buffer removed");
}
}
@Test
public void uncaughtReadFails() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(status.getDescription()).contains("channelRead() missed");
}
}
@Test
public void waitUntilActiveHandler_channelActive() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
WaitUntilActiveHandler handler =
new WaitUntilActiveHandler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
assertTrue(ctx.channel().isActive());
latch.countDown();
super.handlerAdded(ctx);
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
assertEquals(1, latch.getCount());
chan.connect(addr).sync();
chan.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertNull(chan.pipeline().context(WaitUntilActiveHandler.class));
}
@Test
public void assertEventLoopsAndChannelType_onlyTypeProvided() {
builder.channelType(LocalServerChannel.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided");
builder.assertEventLoopsAndChannelType();
}
@Test
public void assertEventLoopsAndChannelType_allProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
builder.workerEventLoopGroup(mockEventLoopGroup);
builder.channelType(LocalServerChannel.class);
builder.assertEventLoopsAndChannelType();
}
private static void testSniClient(SslProvider sslClientProvider, SslProvider sslServerProvider) throws Exception {
final String sniHost = "sni.netty.io";
LocalAddress address = new LocalAddress("test");
EventLoopGroup group = new DefaultEventLoopGroup(1);
Channel sc = null;
Channel cc = null;
try {
SelfSignedCertificate cert = new SelfSignedCertificate();
final SslContext sslServerContext = SslContextBuilder.forServer(cert.key(), cert.cert())
.sslProvider(sslServerProvider).build();
final Promise<String> promise = group.next().newPromise();
ServerBootstrap sb = new ServerBootstrap();
sc = sb.group(group).channel(LocalServerChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(new SniHandler(new Mapping<String, SslContext>() {
@Override
public SslContext map(String input) {
promise.setSuccess(input);
return sslServerContext;
}
}));
}
}).bind(address).syncUninterruptibly().channel();
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
.sslProvider(sslClientProvider).build();
Bootstrap cb = new Bootstrap();
cc = cb.group(group).channel(LocalChannel.class).handler(new SslHandler(
sslContext.newEngine(ByteBufAllocator.DEFAULT, sniHost, -1)))
.connect(address).syncUninterruptibly().channel();
Assert.assertEquals(sniHost, promise.syncUninterruptibly().getNow());
} finally {
if (cc != null) {
cc.close().syncUninterruptibly();
}
if (sc != null) {
sc.close().syncUninterruptibly();
}
group.shutdownGracefully();
}
}
@Test
public void testRegisterClusterFailsWhenBindTimesOut() throws Exception {
// Designated address to be slow to bind.
SocketAddress slowAddr = localAddressResolver.get();
// create a bootstrap with a handler that delays binding by 1 second for designated address.
ServerBootstrap serverBootstrap =
new ServerBootstrap()
.group(eventLoop)
.channel(LocalServerChannel.class)
.handler(new SlowBindHandler(slowAddr))
.childHandler(new Server.Initializer());
// Define server with 500ms timeout, which should cause binding of slow address to timeout and
// fail register.
Server flakyServer =
new Server(
localAddressResolver,
eventLoop,
true,
new HashedWheelTimer(),
false,
TimeUnit.NANOSECONDS.convert(500, TimeUnit.MILLISECONDS),
new StubStore(),
false,
serverBootstrap);
// Create a 2 node cluster with 1 node having the slow address.
ClusterSpec cluster = ClusterSpec.builder().build();
DataCenterSpec dc = cluster.addDataCenter().build();
dc.addNode().withAddress(slowAddr).build();
dc.addNode().build();
// Attempt to register which should fail.
try {
flakyServer.register(cluster);
fail();
} catch (Exception e) {
// Expect a timeout exception.
assertThat(e.getCause()).isInstanceOf(TimeoutException.class);
}
}
@Test
public void waitUntilActiveHandler_firesNegotiation() throws Exception {
EventLoopGroup elg = new DefaultEventLoopGroup(1);
SocketAddress addr = new LocalAddress("addr");
final AtomicReference<Object> event = new AtomicReference<>();
ChannelHandler next = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
event.set(evt);
ctx.close();
}
};
Channel s = new ServerBootstrap()
.childHandler(new ChannelInboundHandlerAdapter())
.group(elg)
.channel(LocalServerChannel.class)
.bind(addr)
.sync()
.channel();
Channel c = new Bootstrap()
.handler(new WaitUntilActiveHandler(next))
.channel(LocalChannel.class).group(group)
.connect(addr)
.sync()
.channel();
c.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
SocketAddress localAddr = c.localAddress();
ProtocolNegotiationEvent expectedEvent = ProtocolNegotiationEvent.DEFAULT
.withAttributes(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, localAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, addr)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build());
c.closeFuture().sync();
assertThat(event.get()).isInstanceOf(ProtocolNegotiationEvent.class);
ProtocolNegotiationEvent actual = (ProtocolNegotiationEvent) event.get();
assertThat(actual).isEqualTo(expectedEvent);
s.close();
elg.shutdownGracefully();
}
@Test
public void clientTlsHandler_firesNegotiation() throws Exception {
SelfSignedCertificate cert = new SelfSignedCertificate("authority");
SslContext clientSslContext =
GrpcSslContexts.configure(SslContextBuilder.forClient().trustManager(cert.cert())).build();
SslContext serverSslContext =
GrpcSslContexts.configure(SslContextBuilder.forServer(cert.key(), cert.cert())).build();
FakeGrpcHttp2ConnectionHandler gh = FakeGrpcHttp2ConnectionHandler.newHandler();
ClientTlsProtocolNegotiator pn = new ClientTlsProtocolNegotiator(clientSslContext, null);
WriteBufferingAndExceptionHandler clientWbaeh =
new WriteBufferingAndExceptionHandler(pn.newHandler(gh));
SocketAddress addr = new LocalAddress("addr");
ChannelHandler sh =
ProtocolNegotiators.serverTls(serverSslContext)
.newHandler(FakeGrpcHttp2ConnectionHandler.noopHandler());
WriteBufferingAndExceptionHandler serverWbaeh = new WriteBufferingAndExceptionHandler(sh);
Channel s = new ServerBootstrap()
.childHandler(serverWbaeh)
.group(group)
.channel(LocalServerChannel.class)
.bind(addr)
.sync()
.channel();
Channel c = new Bootstrap()
.handler(clientWbaeh)
.channel(LocalChannel.class)
.group(group)
.register()
.sync()
.channel();
ChannelFuture write = c.writeAndFlush(NettyClientHandler.NOOP_MESSAGE);
c.connect(addr).sync();
write.sync();
boolean completed = gh.negotiated.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
assertTrue("failed to negotiated", write.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
// sync should fail if we are in this block.
write.sync();
throw new AssertionError("neither wrote nor negotiated");
}
c.close();
s.close();
pn.close();
assertThat(gh.securityInfo).isNotNull();
assertThat(gh.securityInfo.tls).isNotNull();
assertThat(gh.attrs.get(GrpcAttributes.ATTR_SECURITY_LEVEL))
.isEqualTo(SecurityLevel.PRIVACY_AND_INTEGRITY);
assertThat(gh.attrs.get(Grpc.TRANSPORT_ATTR_SSL_SESSION)).isInstanceOf(SSLSession.class);
// This is not part of the ClientTls negotiation, but shows that the negotiation event happens
// in the right order.
assertThat(gh.attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)).isEqualTo(addr);
}