下面列出了怎么用io.netty.util.concurrent.DefaultEventExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
MonoProcessor<Void> serverConnDisposed = MonoProcessor.create();
ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());
CountDownLatch latch = new CountDownLatch(1);
DisposableServer boundServer =
TcpServer.create()
.port(0)
.doOnConnection(c -> {
c.onDispose()
.subscribe(serverConnDisposed);
group.add(c.channel());
latch.countDown();
})
.wiretap(true)
.bindNow();
TcpClient.create()
.remoteAddress(boundServer::address)
.wiretap(true)
.connect()
.subscribe();
assertTrue(latch.await(30, TimeUnit.SECONDS));
boundServer.disposeNow();
FutureMono.from(group.close())
.block(Duration.ofSeconds(30));
serverConnDisposed.block(Duration.ofSeconds(5));
}
@Benchmark
public void nettyDefaultEventExecutor() throws InterruptedException {
execute(new DefaultSingleThreadExecutor(
new DefaultEventExecutor(new NamedThreadFactory("netty_executor", true))));
}
@Override
public Future<Channel> getConnection(ConnectionParameters connectionParameters, HostAndPort address)
{
return new DefaultEventExecutor().newPromise();
}
@Test
public void testIssue688() throws Exception {
CountDownLatch connected = new CountDownLatch(1);
CountDownLatch configured = new CountDownLatch(1);
CountDownLatch disconnected = new CountDownLatch(1);
ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());
DisposableServer server =
TcpServer.create()
.port(0)
.childObserve((connection, newState) -> {
if (newState == ConnectionObserver.State.CONNECTED) {
group.add(connection.channel());
connected.countDown();
}
else if (newState == ConnectionObserver.State.CONFIGURED) {
configured.countDown();
}
else if (newState == ConnectionObserver.State.DISCONNECTING) {
disconnected.countDown();
}
})
.wiretap(true)
.bindNow();
TcpClient.create()
.remoteAddress(server::address)
.wiretap(true)
.connect()
.subscribe();
assertTrue(connected.await(30, TimeUnit.SECONDS));
assertTrue(configured.await(30, TimeUnit.SECONDS));
FutureMono.from(group.close())
.block(Duration.ofSeconds(30));
assertTrue(disconnected.await(30, TimeUnit.SECONDS));
server.disposeNow();
}
@Test
public void testGracefulShutdown() throws Exception {
CountDownLatch latch1 = new CountDownLatch(2);
CountDownLatch latch2 = new CountDownLatch(2);
LoopResources loop = LoopResources.create("testGracefulShutdown");
DisposableServer disposableServer =
TcpServer.create()
.port(0)
.runOn(loop)
.doOnConnection(c -> {
c.onDispose().subscribe(null, null, latch2::countDown);
latch1.countDown();
})
// Register a channel group, when invoking disposeNow()
// the implementation will wait for the active requests to finish
.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
.handle((in, out) -> out.sendString(Mono.just("delay1000")
.delayElement(Duration.ofSeconds(1))))
.wiretap(true)
.bindNow(Duration.ofSeconds(30));
TcpClient client = TcpClient.create()
.remoteAddress(disposableServer::address)
.wiretap(true);
MonoProcessor<String> result = MonoProcessor.create();
Flux.merge(client.connect(), client.connect())
.flatMap(conn ->
conn.inbound()
.receive()
.asString())
.collect(Collectors.joining())
.subscribe(result);
assertTrue(latch1.await(30, TimeUnit.SECONDS));
// Stop accepting incoming requests, wait at most 3s for the active requests to finish
disposableServer.disposeNow();
// Dispose the event loop
loop.disposeLater()
.block(Duration.ofSeconds(30));
assertTrue(latch2.await(30, TimeUnit.SECONDS));
StepVerifier.create(result)
.expectNext("delay1000delay1000")
.verifyComplete();
}
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
disposableServer =
HttpServer.create()
.port(0)
.route(r -> r.get("/never",
(req, res) -> res.sendString(Mono.never()))
.get("/delay10",
(req, res) -> res.sendString(Mono.just("test")
.delayElement(Duration.ofSeconds(10))))
.get("/delay1",
(req, res) -> res.sendString(Mono.just("test")
.delayElement(Duration.ofSeconds(1)))))
.wiretap(true)
.bindNow(Duration.ofSeconds(30));
ConnectionProvider connectionProvider =
ConnectionProvider.create("testChannelGroupClosesAllConnections", Integer.MAX_VALUE);
ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());
CountDownLatch latch1 = new CountDownLatch(3);
CountDownLatch latch2 = new CountDownLatch(3);
HttpClient client = createHttpClientForContextWithAddress(connectionProvider);
Flux.just("/never", "/delay10", "/delay1")
.flatMap(s ->
client.doOnConnected(c -> {
c.onDispose()
.subscribe(null, null, latch2::countDown);
group.add(c.channel());
latch1.countDown();
})
.get()
.uri(s)
.responseContent()
.aggregate()
.asString())
.subscribe();
assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();
Mono.whenDelayError(FutureMono.from(group.close()), connectionProvider.disposeLater())
.block(Duration.ofSeconds(30));
assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testGracefulShutdown() throws Exception {
CountDownLatch latch1 = new CountDownLatch(2);
CountDownLatch latch2 = new CountDownLatch(2);
LoopResources loop = LoopResources.create("testGracefulShutdown");
disposableServer =
HttpServer.create()
.port(0)
.runOn(loop)
.doOnConnection(c -> {
c.onDispose().subscribe(null, null, latch2::countDown);
latch1.countDown();
})
// Register a channel group, when invoking disposeNow()
// the implementation will wait for the active requests to finish
.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
.route(r -> r.get("/delay500", (req, res) -> res.sendString(Mono.just("delay500")
.delayElement(Duration.ofMillis(500))))
.get("/delay1000", (req, res) -> res.sendString(Mono.just("delay1000")
.delayElement(Duration.ofSeconds(1)))))
.wiretap(true)
.bindNow(Duration.ofSeconds(30));
HttpClient client = HttpClient.create()
.remoteAddress(disposableServer::address)
.wiretap(true);
MonoProcessor<String> result = MonoProcessor.create();
Flux.just("/delay500", "/delay1000")
.flatMap(s ->
client.get()
.uri(s)
.responseContent()
.aggregate()
.asString())
.collect(Collectors.joining())
.subscribe(result);
assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();
// Stop accepting incoming requests, wait at most 3s for the active requests to finish
disposableServer.disposeNow();
// Dispose the event loop
loop.disposeLater()
.block(Duration.ofSeconds(30));
assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
StepVerifier.create(result)
.expectNext("delay500delay1000")
.verifyComplete();
}