下面列出了怎么用io.netty.channel.DefaultEventLoopGroup的API类实例代码及写法,或者点击链接到github查看源代码。
@BeforeClass
public static void init() {
// Configure a test server
group = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
@BeforeClass
public static void init() {
// Configure a test server
group = new DefaultEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
@Test
public void testPooling()
{
try (ConnectionPool pool = new ConnectionPool(new TestingConnectionManager(), new DefaultEventLoopGroup(), 10, new Duration(1, MINUTES))) {
HostAndPort address1 = HostAndPort.fromParts("localhost", 1234);
HostAndPort address2 = HostAndPort.fromParts("localhost", 4567);
Channel channel1 = futureGet(pool.getConnection(PARAMETERS, address1));
Channel channel2 = futureGet(pool.getConnection(PARAMETERS, address1));
assertSame(channel1, channel2);
Channel channel3 = futureGet(pool.getConnection(PARAMETERS, address2));
assertNotSame(channel1, channel3);
Channel channel4 = futureGet(pool.getConnection(PARAMETERS, address1));
assertSame(channel1, channel4);
}
}
@Test
public void testConnectionClosed()
{
try (ConnectionPool pool = new ConnectionPool(new TestingConnectionManager(), new DefaultEventLoopGroup(), 10, new Duration(1, MINUTES))) {
HostAndPort address = HostAndPort.fromParts("localhost", 1234);
Channel channel1 = futureGet(pool.getConnection(PARAMETERS, address));
assertTrue(channel1.isOpen());
channel1.close();
assertFalse(channel1.isOpen());
Channel channel2 = futureGet(pool.getConnection(PARAMETERS, address));
assertTrue(channel2.isOpen());
assertNotSame(channel1, channel2);
}
}
@Test
void stressTest() {
final EventLoopGroup group = new DefaultEventLoopGroup(1024);
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(group, GROUP_SIZE, GROUP_SIZE,
ImmutableList.of());
final List<AbstractEventLoopEntry> acquiredEntries = new ArrayList<>();
stressTest(s, acquiredEntries, 0.8);
stressTest(s, acquiredEntries, 0.5);
stressTest(s, acquiredEntries, 0.2);
// Release all acquired entries to make sure activeRequests are all 0.
acquiredEntries.forEach(AbstractEventLoopEntry::release);
final List<AbstractEventLoopEntry> entries = s.entries(SessionProtocol.HTTP, endpoint, endpoint);
for (AbstractEventLoopEntry e : entries) {
assertThat(e.activeRequests()).withFailMessage("All entries must have 0 activeRequests.").isZero();
}
assertThat(entries.get(0).id()).isZero();
}
@Test
public void clientTlsHandler_userEventTriggeredSslEvent_supportedProtocolH2() throws Exception {
SslHandler goodSslHandler = new SslHandler(engine, false) {
@Override
public String applicationProtocol() {
return "h2";
}
};
DefaultEventLoopGroup elg = new DefaultEventLoopGroup(1);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
pipeline.addLast(handler);
pipeline.replace(SslHandler.class, null, goodSslHandler);
pipeline.fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
channelHandlerCtx = pipeline.context(handler);
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
pipeline.fireUserEventTriggered(sslEvent);
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
assertNotNull(grpcHandlerCtx);
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(1);
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE = 5;
channel = new EmbeddedChannel();
channel.attr(ChannelAttributes.CLIENT_ID).getAndSet("client");
eventExecutors.register(channel);
orderedTopicHandler = new OrderedTopicHandler();
channel.pipeline().addFirst(orderedTopicHandler);
}
@Test
public void testExpire() throws Throwable {
EventLoopGroup group = new DefaultEventLoopGroup(1);
try {
EventLoop loop = group.next();
final DefaultDnsCache cache = new DefaultDnsCache();
cache.cache("netty.io", null, NetUtil.LOCALHOST, 1, loop);
cache.cache("netty.io", null, NetUtil.LOCALHOST6, 10000, loop);
Throwable error = loop.schedule(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
try {
Assert.assertNull(cache.get("netty.io", null));
return null;
} catch (Throwable cause) {
return cause;
}
}
}, 1, TimeUnit.SECONDS).get();
if (error != null) {
throw error;
}
} finally {
group.shutdownGracefully();
}
}
@Test(timeout = 15000)
public void testSocketReuse() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
LocalHandler serverHandler = new LocalHandler("SERVER");
serverBootstrap
.group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup())
.channel(LocalServerChannel.class)
.childHandler(serverHandler);
Bootstrap clientBootstrap = new Bootstrap();
LocalHandler clientHandler = new LocalHandler("CLIENT");
clientBootstrap
.group(new DefaultEventLoopGroup())
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler);
serverBootstrap.bind(new LocalAddress(LOCAL_CHANNEL)).sync();
int count = 100;
for (int i = 1; i < count + 1; i ++) {
Channel ch = clientBootstrap.connect().sync().channel();
// SPIN until we get what we are looking for.
int target = i * messageCountPerRun;
while (serverHandler.count.get() != target || clientHandler.count.get() != target) {
Thread.sleep(50);
}
close(ch, clientHandler);
}
assertEquals(count * 2 * messageCountPerRun, serverHandler.count.get() +
clientHandler.count.get());
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should have since a custom one was not provided.
try {
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
fail("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// expected
}
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopAndTimerIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
Timer timer = new HashedWheelTimer();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withTimer(timer)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should not have since a custom one was not provided.
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
timer.stop();
}
public HDBClient(ClientConfiguration configuration, StatsLogger statsLogger) {
this.configuration = configuration;
this.statsLogger = statsLogger.scope("hdbclient");
int corePoolSize = configuration.getInt(ClientConfiguration.PROPERTY_CLIENT_CALLBACKS, ClientConfiguration.PROPERTY_CLIENT_CALLBACKS_DEFAULT);
this.maxOperationRetryCount = configuration.getInt(ClientConfiguration.PROPERTY_MAX_OPERATION_RETRY_COUNT, ClientConfiguration.PROPERTY_MAX_OPERATION_RETRY_COUNT_DEFAULT);
this.operationRetryDelay = configuration.getInt(ClientConfiguration.PROPERTY_OPERATION_RETRY_DELAY, ClientConfiguration.PROPERTY_OPERATION_RETRY_DELAY_DEFAULT);
this.thredpool = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(Runnable r) -> {
Thread t = new FastThreadLocalThread(r, "hdb-client");
t.setDaemon(true);
return t;
});
this.networkGroup = NetworkUtils.isEnableEpoolNative() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
this.localEventsGroup = new DefaultEventLoopGroup();
String mode = configuration.getString(ClientConfiguration.PROPERTY_MODE, ClientConfiguration.PROPERTY_MODE_LOCAL);
switch (mode) {
case ClientConfiguration.PROPERTY_MODE_LOCAL:
case ClientConfiguration.PROPERTY_MODE_STANDALONE:
this.clientSideMetadataProvider = new StaticClientSideMetadataProvider(
configuration.getString(ClientConfiguration.PROPERTY_SERVER_ADDRESS, ClientConfiguration.PROPERTY_SERVER_ADDRESS_DEFAULT),
configuration.getInt(ClientConfiguration.PROPERTY_SERVER_PORT, ClientConfiguration.PROPERTY_SERVER_PORT_DEFAULT),
configuration.getBoolean(ClientConfiguration.PROPERTY_SERVER_SSL, ClientConfiguration.PROPERTY_SERVER_SSL_DEFAULT)
);
break;
case ClientConfiguration.PROPERTY_MODE_CLUSTER:
this.clientSideMetadataProvider = new ZookeeperClientSideMetadataProvider(
configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS_DEFAULT),
configuration.getInt(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT),
configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, ClientConfiguration.PROPERTY_ZOOKEEPER_PATH_DEFAULT)
);
break;
default:
throw new IllegalStateException(mode);
}
}
@Test
void defaultMaxNumEventLoopsEqualsOne() {
final EventLoopGroup group = new DefaultEventLoopGroup(7);
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(group, 0, 0, ImmutableList.of());
final List<AbstractEventLoopEntry> entries1 = s.entries(SessionProtocol.H1C, endpointA, endpointA);
assertThat(entries1).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1C, endpointA, endpointA);
assertThat(entries1).hasSize(1);
}
@Test
void singleEndpointMaxNumEventLoops() {
final EventLoopGroup group = new DefaultEventLoopGroup(7);
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(
group, 4, 5, ImmutableList.of(endpoint -> {
if (endpoint.equals(endpointA)) {
return 3;
} else {
return -1;
}
}));
checkMaxNumEventLoops(s, endpointA, endpointB);
}
private static void checkEventLoopAssignedSequentially(
List<ToIntFunction<Endpoint>> maxNumEventLoopsFunctions, int maxNumEventLoops) {
final EventLoopGroup group = new DefaultEventLoopGroup(7);
final List<EventLoop> eventLoops = Streams.stream(group)
.map(EventLoop.class::cast)
.collect(toImmutableList());
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(group, maxNumEventLoops,
maxNumEventLoops,
maxNumEventLoopsFunctions);
// endpointA
EventLoop firstEventLoop = acquireEntry(s, endpointA).get();
int firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(0, 1);
checkNextEventLoopIdx(s, eventLoops, endpointA, firstEventLoopIdx, 0, 2);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointA).get());
// endpointB
firstEventLoop = acquireEntry(s, endpointB).get();
firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(2, 3, 4);
checkNextEventLoopIdx(s, eventLoops, endpointB, firstEventLoopIdx, 2, 3);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointB).get());
// endpointC
firstEventLoop = acquireEntry(s, endpointC).get();
firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(0, 1, 2, 5, 6);
checkNextEventLoopIdx(s, eventLoops, endpointC, firstEventLoopIdx, 5, 5);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointC).get());
}
@Test
public void clientTlsHandler_userEventTriggeredSslEvent_supportedProtocolCustom()
throws Exception {
SslHandler goodSslHandler = new SslHandler(engine, false) {
@Override
public String applicationProtocol() {
return "managed_mtls";
}
};
DefaultEventLoopGroup elg = new DefaultEventLoopGroup(1);
File clientCert = TestUtils.loadCert("client.pem");
File key = TestUtils.loadCert("client.key");
List<String> alpnList = Arrays.asList("managed_mtls", "h2");
ApplicationProtocolConfig apn = new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
alpnList);
sslContext = GrpcSslContexts.forClient()
.keyManager(clientCert, key)
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(apn).build();
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
pipeline.addLast(handler);
pipeline.replace(SslHandler.class, null, goodSslHandler);
pipeline.fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
channelHandlerCtx = pipeline.context(handler);
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
pipeline.fireUserEventTriggered(sslEvent);
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
assertNotNull(grpcHandlerCtx);
}
@Test
public void clientTlsHandler_userEventTriggeredSslEvent_unsupportedProtocol() throws Exception {
SslHandler goodSslHandler = new SslHandler(engine, false) {
@Override
public String applicationProtocol() {
return "badproto";
}
};
DefaultEventLoopGroup elg = new DefaultEventLoopGroup(1);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
pipeline.addLast(handler);
final AtomicReference<Throwable> error = new AtomicReference<>();
ChannelHandler errorCapture = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
error.set(cause);
}
};
pipeline.addLast(errorCapture);
pipeline.replace(SslHandler.class, null, goodSslHandler);
pipeline.fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
channelHandlerCtx = pipeline.context(handler);
Object sslEvent = SslHandshakeCompletionEvent.SUCCESS;
pipeline.fireUserEventTriggered(sslEvent);
// Bad protocol was specified, so there should be an error, (normally handled by WBAEH)
assertThat(error.get()).hasMessageThat().contains("Unable to find compatible protocol");
ChannelHandlerContext grpcHandlerCtx = pipeline.context(grpcHandler);
assertNull(grpcHandlerCtx);
}
@BeforeClass
public static void beforeClass() {
group = new DefaultEventLoopGroup(2);
}
private static void testSniClient(SslProvider sslClientProvider, SslProvider sslServerProvider) throws Exception {
final String sniHost = "sni.netty.io";
LocalAddress address = new LocalAddress("test");
EventLoopGroup group = new DefaultEventLoopGroup(1);
Channel sc = null;
Channel cc = null;
try {
SelfSignedCertificate cert = new SelfSignedCertificate();
final SslContext sslServerContext = SslContextBuilder.forServer(cert.key(), cert.cert())
.sslProvider(sslServerProvider).build();
final Promise<String> promise = group.next().newPromise();
ServerBootstrap sb = new ServerBootstrap();
sc = sb.group(group).channel(LocalServerChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(new SniHandler(new Mapping<String, SslContext>() {
@Override
public SslContext map(String input) {
promise.setSuccess(input);
return sslServerContext;
}
}));
}
}).bind(address).syncUninterruptibly().channel();
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
.sslProvider(sslClientProvider).build();
Bootstrap cb = new Bootstrap();
cc = cb.group(group).channel(LocalChannel.class).handler(new SslHandler(
sslContext.newEngine(ByteBufAllocator.DEFAULT, sniHost, -1)))
.connect(address).syncUninterruptibly().channel();
Assert.assertEquals(sniHost, promise.syncUninterruptibly().getNow());
} finally {
if (cc != null) {
cc.close().syncUninterruptibly();
}
if (sc != null) {
sc.close().syncUninterruptibly();
}
group.shutdownGracefully();
}
}
private static void handshake(SslProvider sslProvider, CountDownLatch latch, ChannelHandler serverHandler,
byte[] response, ChannelHandler clientHandler, OcspClientCallback callback) throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
try {
SslContext serverSslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(sslProvider)
.enableOcsp(true)
.build();
try {
SslContext clientSslContext = SslContextBuilder.forClient()
.sslProvider(sslProvider)
.enableOcsp(true)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
try {
EventLoopGroup group = new DefaultEventLoopGroup();
try {
LocalAddress address = new LocalAddress("handshake-" + Math.random());
Channel server = newServer(group, address, serverSslContext, response, serverHandler);
Channel client = newClient(group, address, clientSslContext, callback, clientHandler);
try {
assertTrue("Something went wrong.", latch.await(10L, TimeUnit.SECONDS));
} finally {
client.close().syncUninterruptibly();
server.close().syncUninterruptibly();
}
} finally {
group.shutdownGracefully(1L, 1L, TimeUnit.SECONDS);
}
} finally {
ReferenceCountUtil.release(clientSslContext);
}
} finally {
ReferenceCountUtil.release(serverSslContext);
}
} finally {
ssc.delete();
}
}
@BeforeClass
public static void beforeClass() {
group1 = new DefaultEventLoopGroup(2);
group2 = new DefaultEventLoopGroup(2);
sharedGroup = new DefaultEventLoopGroup(1);
}
@Test(expected = IllegalArgumentException.class)
public void notProvidingChannelFactory_unknownEventLoopGroup() {
SdkEventLoopGroup.create(new DefaultEventLoopGroup());
}
@BeforeMethod
public void startEventLoop() {
workAroundIssue277 = false;
eventLoop = new DefaultEventLoopGroup();
}
@BeforeMethod
public void startEventLoop() {
eventLoop = new DefaultEventLoopGroup();
}
@BeforeMethod
public void startEventLoop() {
eventLoop = new DefaultEventLoopGroup();
}
@Test
void maxNumEventLoopsFunction() {
final EventLoopGroup group = new DefaultEventLoopGroup(7);
final List<ToIntFunction<Endpoint>> maxNumEventLoopsFunctions = ImmutableList.of(
endpoint -> {
if ("a".equals(endpoint.host())) {
if (endpoint.hasPort()) {
final int port = endpoint.port();
if (port == 80) {
return 2;
}
if (port == 8443) {
return 3;
}
}
return 1;
}
return -1;
},
endpoint -> {
if (endpoint.equals(endpointB80)) {
return 4;
}
if (endpoint.equals(endpointB443)) {
return 5;
}
return -1;
});
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(group, 7, 7,
maxNumEventLoopsFunctions);
final List<AbstractEventLoopEntry> entries1 = s.entries(SessionProtocol.H1C, endpointA, endpointA);
assertThat(entries1).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1C, endpointA, endpointA);
assertThat(entries1).hasSize(2);
final List<AbstractEventLoopEntry> entries2 = s.entries(SessionProtocol.H1C, endpointA80, endpointA80);
assertThat(entries2).hasSize(2);
final List<AbstractEventLoopEntry> entries3 =
s.entries(SessionProtocol.H1C, endpointA443, endpointA443);
assertThat(entries3).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1C, endpointA443, endpointA443);
assertThat(entries3).hasSize(1); // Fallback to "a.com"
final List<AbstractEventLoopEntry> entries4 =
s.entries(SessionProtocol.H1C, endpointA8443, endpointA8443);
assertThat(entries4).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1C, endpointA8443, endpointA8443);
assertThat(entries4).hasSize(3); // Matched to Endpoint.of("a.com", 36462)
// Clear text SessionProtocols.
final List<AbstractEventLoopEntry> bComClearText =
s.entries(SessionProtocol.H1C, endpointB80, endpointB80);
assertThat(bComClearText).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1C, endpointB, endpointB);
assertThat(bComClearText).hasSize(4); // Fallback to "b.com:80"
final List<AbstractEventLoopEntry> entries5 = s.entries(SessionProtocol.H1C, endpointB, endpointB);
assertThat(bComClearText).isSameAs(entries5);
final List<AbstractEventLoopEntry> entries6 = s.entries(SessionProtocol.H2C, endpointB, endpointB);
acquireTenEntries(s, SessionProtocol.H2C, endpointB, endpointB);
assertThat(bComClearText).hasSize(4);
final List<AbstractEventLoopEntry> entries7 = s.entries(SessionProtocol.HTTP, endpointB, endpointB);
assertThat(entries6).isSameAs(entries7);
// TLS SessionProtocols.
final List<AbstractEventLoopEntry> bComTls = s.entries(SessionProtocol.H1, endpointB443, endpointB443);
assertThat(bComTls).hasSize(0);
acquireTenEntries(s, SessionProtocol.H1, endpointB, endpointB);
assertThat(bComTls).hasSize(5); // Fallback to "b.com:433"
final List<AbstractEventLoopEntry> entries8 = s.entries(SessionProtocol.H1, endpointB, endpointB);
assertThat(bComTls).isSameAs(entries8);
final List<AbstractEventLoopEntry> entries9 = s.entries(SessionProtocol.H2, endpointB, endpointB);
acquireTenEntries(s, SessionProtocol.H2, endpointB, endpointB);
assertThat(entries9).hasSize(5);
final List<AbstractEventLoopEntry> entries10 = s.entries(SessionProtocol.HTTPS, endpointB, endpointB);
assertThat(entries9).isSameAs(entries10);
final List<AbstractEventLoopEntry> entries11 =
s.entries(SessionProtocol.H1, endpointB8443, endpointB8443);
assertThat(entries11).hasSize(
1); // One entry is pushed when eventLoops.size() == maxNumEventLoops
acquireTenEntries(s, SessionProtocol.H1, endpointB8443, endpointB8443);
assertThat(entries11).hasSize(7); // No match
}
@Override
protected EventLoopGroup group() {
return new DefaultEventLoopGroup();
}
@Test
public void testConnect() throws ExecutionException {
EventLoopGroup group = new DefaultEventLoopGroup();
Connector connector =
new Connector(address) {
@Override
protected List<Map.Entry<String, ChannelHandler>> payloadHandlers() {
return Arrays.asList();
}
@Override
protected EventLoopGroup group() {
return group;
}
@Override
protected Class<? extends Channel> channel() {
return LocalChannel.class;
}
};
ListenableFuture<Channel> future = connector.connect();
CountDownLatch done = new CountDownLatch(1);
Futures.addCallback(
future,
new FutureCallback<Channel>() {
@Override
public void onSuccess(Channel ch) {
assertTrue(true);
done.countDown();
}
@Override
public void onFailure(Throwable throwable) {
done.countDown();
assertTrue(false);
}
});
Uninterruptibles.awaitUninterruptibly(done); // block
}
@Test
public void waitUntilActiveHandler_firesNegotiation() throws Exception {
EventLoopGroup elg = new DefaultEventLoopGroup(1);
SocketAddress addr = new LocalAddress("addr");
final AtomicReference<Object> event = new AtomicReference<>();
ChannelHandler next = new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
event.set(evt);
ctx.close();
}
};
Channel s = new ServerBootstrap()
.childHandler(new ChannelInboundHandlerAdapter())
.group(elg)
.channel(LocalServerChannel.class)
.bind(addr)
.sync()
.channel();
Channel c = new Bootstrap()
.handler(new WaitUntilActiveHandler(next))
.channel(LocalChannel.class).group(group)
.connect(addr)
.sync()
.channel();
c.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
SocketAddress localAddr = c.localAddress();
ProtocolNegotiationEvent expectedEvent = ProtocolNegotiationEvent.DEFAULT
.withAttributes(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, localAddr)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, addr)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build());
c.closeFuture().sync();
assertThat(event.get()).isInstanceOf(ProtocolNegotiationEvent.class);
ProtocolNegotiationEvent actual = (ProtocolNegotiationEvent) event.get();
assertThat(actual).isEqualTo(expectedEvent);
s.close();
elg.shutdownGracefully();
}