类io.netty.util.concurrent.DefaultEventExecutor源码实例Demo

下面列出了怎么用io.netty.util.concurrent.DefaultEventExecutor的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
	MonoProcessor<Void> serverConnDisposed = MonoProcessor.create();

	ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());

	CountDownLatch latch = new CountDownLatch(1);

	DisposableServer boundServer =
			TcpServer.create()
			         .port(0)
			         .doOnConnection(c -> {
			             c.onDispose()
			              .subscribe(serverConnDisposed);
			             group.add(c.channel());
			             latch.countDown();
			         })
			         .wiretap(true)
			         .bindNow();

	TcpClient.create()
	         .remoteAddress(boundServer::address)
	         .wiretap(true)
	         .connect()
	         .subscribe();

	assertTrue(latch.await(30, TimeUnit.SECONDS));

	boundServer.disposeNow();

	FutureMono.from(group.close())
	          .block(Duration.ofSeconds(30));

	serverConnDisposed.block(Duration.ofSeconds(5));
}
 
@Benchmark
public void nettyDefaultEventExecutor() throws InterruptedException {
    execute(new DefaultSingleThreadExecutor(
        new DefaultEventExecutor(new NamedThreadFactory("netty_executor", true))));
}
 
源代码3 项目: drift   文件: TestDriftNettyMethodInvoker.java
@Override
public Future<Channel> getConnection(ConnectionParameters connectionParameters, HostAndPort address)
{
    return new DefaultEventExecutor().newPromise();
}
 
源代码4 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void testIssue688() throws Exception {
	CountDownLatch connected = new CountDownLatch(1);
	CountDownLatch configured = new CountDownLatch(1);
	CountDownLatch disconnected = new CountDownLatch(1);

	ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());

	DisposableServer server =
			TcpServer.create()
			         .port(0)
			         .childObserve((connection, newState) -> {
			             if (newState == ConnectionObserver.State.CONNECTED) {
			                 group.add(connection.channel());
			                 connected.countDown();
			             }
			             else if (newState == ConnectionObserver.State.CONFIGURED) {
			                 configured.countDown();
			             }
			             else if (newState == ConnectionObserver.State.DISCONNECTING) {
			                 disconnected.countDown();
			             }
			         })
			         .wiretap(true)
			         .bindNow();

	TcpClient.create()
	         .remoteAddress(server::address)
	         .wiretap(true)
	         .connect()
	         .subscribe();

	assertTrue(connected.await(30, TimeUnit.SECONDS));

	assertTrue(configured.await(30, TimeUnit.SECONDS));

	FutureMono.from(group.close())
	          .block(Duration.ofSeconds(30));

	assertTrue(disconnected.await(30, TimeUnit.SECONDS));

	server.disposeNow();
}
 
源代码5 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void testGracefulShutdown() throws Exception {
	CountDownLatch latch1 = new CountDownLatch(2);
	CountDownLatch latch2 = new CountDownLatch(2);
	LoopResources loop = LoopResources.create("testGracefulShutdown");
	DisposableServer disposableServer =
			TcpServer.create()
			         .port(0)
			         .runOn(loop)
			         .doOnConnection(c -> {
			             c.onDispose().subscribe(null, null, latch2::countDown);
			             latch1.countDown();
			         })
			         // Register a channel group, when invoking disposeNow()
			         // the implementation will wait for the active requests to finish
			         .channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
			         .handle((in, out) -> out.sendString(Mono.just("delay1000")
			                                                 .delayElement(Duration.ofSeconds(1))))
			         .wiretap(true)
			         .bindNow(Duration.ofSeconds(30));

	TcpClient client = TcpClient.create()
	                            .remoteAddress(disposableServer::address)
	                            .wiretap(true);

	MonoProcessor<String> result = MonoProcessor.create();
	Flux.merge(client.connect(), client.connect())
	    .flatMap(conn ->
	            conn.inbound()
	                .receive()
	                .asString())
	    .collect(Collectors.joining())
	    .subscribe(result);

	assertTrue(latch1.await(30, TimeUnit.SECONDS));

	// Stop accepting incoming requests, wait at most 3s for the active requests to finish
	disposableServer.disposeNow();

	// Dispose the event loop
	loop.disposeLater()
	    .block(Duration.ofSeconds(30));

	assertTrue(latch2.await(30, TimeUnit.SECONDS));

	StepVerifier.create(result)
	            .expectNext("delay1000delay1000")
	            .verifyComplete();
}
 
源代码6 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .route(r -> r.get("/never",
			                  (req, res) -> res.sendString(Mono.never()))
			              .get("/delay10",
			                  (req, res) -> res.sendString(Mono.just("test")
			                                                   .delayElement(Duration.ofSeconds(10))))
			              .get("/delay1",
			                  (req, res) -> res.sendString(Mono.just("test")
			                                                   .delayElement(Duration.ofSeconds(1)))))
			          .wiretap(true)
			          .bindNow(Duration.ofSeconds(30));

	ConnectionProvider connectionProvider =
			ConnectionProvider.create("testChannelGroupClosesAllConnections", Integer.MAX_VALUE);

	ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());

	CountDownLatch latch1 = new CountDownLatch(3);
	CountDownLatch latch2 = new CountDownLatch(3);

	HttpClient client = createHttpClientForContextWithAddress(connectionProvider);

	Flux.just("/never", "/delay10", "/delay1")
	    .flatMap(s ->
	            client.doOnConnected(c -> {
	                          c.onDispose()
	                           .subscribe(null, null, latch2::countDown);
	                          group.add(c.channel());
	                          latch1.countDown();
	                      })
	                  .get()
	                  .uri(s)
	                  .responseContent()
	                  .aggregate()
	                  .asString())
	    .subscribe();

	assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();

	Mono.whenDelayError(FutureMono.from(group.close()), connectionProvider.disposeLater())
	    .block(Duration.ofSeconds(30));

	assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();
}
 
源代码7 项目: reactor-netty   文件: HttpServerTests.java
@Test
public void testGracefulShutdown() throws Exception {
	CountDownLatch latch1 = new CountDownLatch(2);
	CountDownLatch latch2 = new CountDownLatch(2);
	LoopResources loop = LoopResources.create("testGracefulShutdown");
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .runOn(loop)
			                       .doOnConnection(c -> {
			                           c.onDispose().subscribe(null, null, latch2::countDown);
			                           latch1.countDown();
			                       })
			          // Register a channel group, when invoking disposeNow()
			          // the implementation will wait for the active requests to finish
			          .channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()))
			          .route(r -> r.get("/delay500", (req, res) -> res.sendString(Mono.just("delay500")
			                                                          .delayElement(Duration.ofMillis(500))))
			                       .get("/delay1000", (req, res) -> res.sendString(Mono.just("delay1000")
			                                                           .delayElement(Duration.ofSeconds(1)))))
			          .wiretap(true)
			          .bindNow(Duration.ofSeconds(30));

	HttpClient client = HttpClient.create()
	                              .remoteAddress(disposableServer::address)
	                              .wiretap(true);

	MonoProcessor<String> result = MonoProcessor.create();
	Flux.just("/delay500", "/delay1000")
	    .flatMap(s ->
	            client.get()
	                  .uri(s)
	                  .responseContent()
	                  .aggregate()
	                  .asString())
	    .collect(Collectors.joining())
	    .subscribe(result);

	assertThat(latch1.await(30, TimeUnit.SECONDS)).isTrue();

	// Stop accepting incoming requests, wait at most 3s for the active requests to finish
	disposableServer.disposeNow();

	// Dispose the event loop
	loop.disposeLater()
	    .block(Duration.ofSeconds(30));

	assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();

	StepVerifier.create(result)
	            .expectNext("delay500delay1000")
	            .verifyComplete();
}
 
 类所在包
 同包方法