下面列出了怎么用io.netty.channel.EventLoopGroup的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpCorsServerInitializer(sslCtx));
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Test
public void testServer() throws InterruptedException {
EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, wokerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(PORT))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully().sync();
wokerGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContextGMBuilder.forServer(ENC_CERT, ENC_KEY, SIGN_CERT, SIGN_KEY, null)
/* 默认协商出来的是ECDHE_SM4_SM3算法,所以必须是双向SSL,并且客户端和服务端必须要有加密证书和签名证书 */
.clientAuth(ClientAuth.REQUIRE)
.build();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SecureChatServerInitializer(sslCtx));
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public BridgeServer(
BridgeServerConfig serverConfig,
InetSocketAddress socketBindAddress,
ChannelInitializer<SocketChannel> channelInitializer,
EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
Map<ChannelOption<?>, Object> childChannelOptions,
Map<ChannelOption<?>, Object> parentChannelOptions,
BridgeServerEventLoopProvider eventLoopProvider
) {
this.runner = new ServerRunner(
serverConfig,
socketBindAddress,
channelInitializer,
bossGroup,
workerGroup,
childChannelOptions,
parentChannelOptions,
eventLoopProvider
);
}
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new TcpRttDecoder())
.addLast(new TcpRttServerHandler());
}
}).childOption(ChannelOption.TCP_NODELAY, true);
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
group.shutdownGracefully();
}
}
public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup,
UserWorker worker) throws DrillbitStartupException {
super(UserRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
allocator.getAsByteBufAllocator(),
eventLoopGroup);
this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
this.sslChannel = null;
try {
this.sslConfig = new SSLConfigBuilder()
.config(context.getConfig())
.mode(SSLConfig.Mode.SERVER)
.initializeSSLContext(true)
.validateKeyStore(true)
.build();
} catch (DrillException e) {
throw new DrillbitStartupException(e.getMessage(), e.getCause());
}
this.userWorker = worker;
// Initialize Singleton instance of UserRpcMetrics.
((UserRpcMetrics)UserRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
}
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static ChannelConfiguration clientConfig(EventLoopGroup workerGroup) {
EventLoopGroup parent = workerGroup;
if (parent instanceof EventLoop) {
parent = ((EventLoop) workerGroup).parent();
}
Class<? extends Channel> channelClass;
if (parent instanceof EpollEventLoopGroup) {
channelClass = EpollSocketChannel.class;
} else if (parent instanceof NioEventLoopGroup) {
channelClass = NioSocketChannel.class;
} else {
throw new RuntimeException("Unsupported EventLoopGroup " + workerGroup.getClass());
}
return new ChannelConfiguration(workerGroup, channelClass);
}
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer(sslCtx));
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerV1Handler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup loopGroup = new NioEventLoopGroup();
try {
ChannelFuture f = new Bootstrap().group(loopGroup)
.channel(NioSctpChannel.class)
// set SCTP option
.option(SctpChannelOption.SCTP_NODELAY, true)
.handler(new ChannelInitializer<SctpChannel>() {
@Override
public void initChannel(SctpChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new SimpleSctpClientHandler());
}
}).connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
loopGroup.shutdownGracefully();
}
}
private void startGetLatency() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.DEBUG));
p.addLast(new NettySimpleMessageHandler());
p.addLast(new ReceiveMessageHandler(runId, offset));
}
});
b.connect(dest);
}
private void doConnect(EventLoopGroup loupGroup, Class<? extends SocketChannel> serverChannelClass, boolean isEpoll)
throws InterruptedException {
final Bootstrap bootstrap = new Bootstrap();
if (isEpoll) {
bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
}
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 256 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 256 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, //
new WriteBufferWaterMark(1024 * 1024, 2048 * 1024));
bootstrap.group(loupGroup);
bootstrap.channel(serverChannelClass);
bootstrap.handler(new BenchmarkChannelInitializer(futureContainer));
for (int i = 0; i < CONNECT_COUNT; i++) {
channels[i] = bootstrap.connect(host, port).sync().channel();
queues[i] = new MpscAtomicArrayQueue<>(4 * 1024);
}
}
static AsyncFuture<Server> setup(
final AsyncFramework async, final CollectdChannelHandler handler, final InetAddress host,
final int port
) {
final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap b = new Bootstrap();
b
.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler);
final ResolvableFuture<Server> future = async.future();
b.bind(host, port).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
future.resolve(new Server(async, f.channel()));
} else {
future.fail(
f.cause() != null ? f.cause() : new RuntimeException("Failed to bind"));
}
});
return future;
}
public static void main(String[] args) throws Exception {
EventLoopGroup mainLoop = new NioEventLoopGroup(1);
EventLoopGroup workerLoop = new NioEventLoopGroup();
try {
ChannelFuture f = new ServerBootstrap().group(mainLoop, workerLoop)
.channel(NioSctpServerChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SctpChannel>() {
@Override
public void initChannel(SctpChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new SimpleSctpServerHandler());
}
}).bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
mainLoop.shutdownGracefully();
workerLoop.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
EventLoopGroup workerGroup = new EpollEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void newClientBootstrap(String host, int port, ChannelInitializer<SocketChannel> initializer){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
ChannelFuture f = b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(initializer)
.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new SecureChatClientInitializer(sslMode));
// Start the connection attempt.
Channel ch = b.connect(host, port).sync().channel();
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(
System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if ("bye".equals(line.toLowerCase())) {
ch.closeFuture().sync();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
// The connection is closed automatically on shutdown.
group.shutdownGracefully();
}
}
@Test
public void testReRegister() {
EventLoopGroup group1 = new LocalEventLoopGroup();
EventLoopGroup group2 = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(group1)
.channel(LocalChannel.class)
.handler(new TestHandler());
sb.group(group2)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
// Start server
final Channel sc = sb.bind(addr).syncUninterruptibly().channel();
// Connect to the server
final Channel cc = cb.connect(addr).syncUninterruptibly().channel();
cc.deregister().syncUninterruptibly();
// Change event loop group.
group2.register(cc).syncUninterruptibly();
cc.close().syncUninterruptibly();
sc.close().syncUninterruptibly();
}
private static void checkEventLoopAssignedSequentially(
List<ToIntFunction<Endpoint>> maxNumEventLoopsFunctions, int maxNumEventLoops) {
final EventLoopGroup group = new DefaultEventLoopGroup(7);
final List<EventLoop> eventLoops = Streams.stream(group)
.map(EventLoop.class::cast)
.collect(toImmutableList());
final DefaultEventLoopScheduler s = new DefaultEventLoopScheduler(group, maxNumEventLoops,
maxNumEventLoops,
maxNumEventLoopsFunctions);
// endpointA
EventLoop firstEventLoop = acquireEntry(s, endpointA).get();
int firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(0, 1);
checkNextEventLoopIdx(s, eventLoops, endpointA, firstEventLoopIdx, 0, 2);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointA).get());
// endpointB
firstEventLoop = acquireEntry(s, endpointB).get();
firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(2, 3, 4);
checkNextEventLoopIdx(s, eventLoops, endpointB, firstEventLoopIdx, 2, 3);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointB).get());
// endpointC
firstEventLoop = acquireEntry(s, endpointC).get();
firstEventLoopIdx = findIndex(eventLoops, firstEventLoop);
assertThat(firstEventLoopIdx).isIn(0, 1, 2, 5, 6);
checkNextEventLoopIdx(s, eventLoops, endpointC, firstEventLoopIdx, 5, 5);
// After one circle, the next event loop is the first one.
assertThat(firstEventLoop).isSameAs(acquireEntry(s, endpointC).get());
}
public void bind(ServiceConfig serviceConfig) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(this.rpcServerInitializer)
.childOption(ChannelOption.SO_KEEPALIVE,true)
;
try {
ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();
RpcURL url=new RpcURL();
url.setHost(serviceConfig.getHost());
url.setPort(serviceConfig.getPort());
url.setRegistryHost(serviceConfig.getRegistryHost());
url.setRegistryPort(serviceConfig.getRegistryPort());
RegistryService registryService=new ConsulRegistryService();
registryService.register(url);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RpcException(e);
}
}
finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioSctpServerChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SctpChannel>() {
@Override
public void initChannel(SctpChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new SctpEchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void run() throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (2)
b.group(workerGroup)
.channel(NioSocketChannel.class) // (3)
.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FrameExtractor());
ch.pipeline().addLast(new FrameMaker());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(clientHandler);
}
})
.option(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.connect("localhost", port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getIoThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(config.getIoThread(), threadFactory);
} else {
return new NioEventLoopGroup(config.getIoThread(), threadFactory);
}
}
private void connect() {
// configuring the NIO thread groups of the server
EventLoopGroup group = new NioEventLoopGroup(1);
// Bootstrap is an assistant class of Netty for setting up client
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.
weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new DataGeneratorClientHandler());
}
});
while (true) {
try {
Thread.sleep(1000);
channel = bootstrap.connect(host, port).sync().channel();
break;
} catch (Exception e) {
logger.info("\n\tData generator client startup fail!");
}
}
logger.info("\n\tData generator client startup successful!");
}
public static void main(String[] args) {
final EventLoopGroup ioGroup = new NioEventLoopGroup(1);
//start listen
Bootstrap boot = new Bootstrap();
boot.group(ioGroup)
.option(ChannelOption.SO_BROADCAST, false)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_SNDBUF, 1024*10)
.option(ChannelOption.SO_RCVBUF, 1024*10)
.channel(NioDatagramChannel.class)
.handler(new UdpHandlerTestClient());
try{
ChannelFuture future = boot.bind(0).sync();
channel = future.channel();
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> f) throws Exception {
boolean isDone = f.isDone();
boolean isSucc = f.isSuccess();
boolean isCancel = f.isCancelled();
if(isDone && isSucc){ //listen
log.I("Init udp succ");
}else{
//shutdown io group
ioGroup.shutdownGracefully();
}
}
});
}catch(Throwable e){
e.printStackTrace();
}
//start loop
ExecutorService thPool = Executors.newFixedThreadPool(1);
thPool.submit(new UdpTestClientLoop());
}
@Test
public void assertEventLoopsAndChannelType_allProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
builder.workerEventLoopGroup(mockEventLoopGroup);
builder.channelType(LocalServerChannel.class);
builder.assertEventLoopsAndChannelType();
}
@Test
public void testTryWithResourcesShouldCloseAllClustersButNotEventLoopIfProvided()
throws Exception {
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
BoundCluster cluster;
MockClient client;
try (Server server =
Server.builder()
.withAddressResolver(localAddressResolver)
.withEventLoopGroup(eventLoop, LocalServerChannel.class)
.build()) {
cluster = server.register(ClusterSpec.builder().withNodes(5));
BoundNode node = cluster.node(0);
SocketAddress address = node.getAddress();
client = new MockClient(eventLoop);
client.connect(address);
}
// event loop should not have been closed.
assertThat(eventLoop.isShutdown()).isFalse();
// timer should have since a custom one was not provided.
try {
cluster
.getServer()
.timer
.newTimeout(
timeout -> {
// noop
},
1,
TimeUnit.SECONDS);
fail("Expected IllegalStateException");
} catch (IllegalStateException ise) {
// expected
}
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}