org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#reactor.netty.DisposableServer源码实例Demo

下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#reactor.netty.DisposableServer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public static void main(String... args) {
    long start = System.currentTimeMillis();
    HttpHandler httpHandler = RouterFunctions.toHttpHandler(routes(
        new BCryptPasswordEncoder(18)
    ));
    ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);

    DisposableServer server = HttpServer.create()
                                        .host("localhost")
                                        .port(8080)
                                        .handle(reactorHttpHandler)
                                        .bindNow();

    LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms");

    server.onDispose()
          .block();
}
 
源代码2 项目: reactor-netty   文件: TcpServerTests.java
@Test(timeout = 2000)
public void startAndAwait() throws InterruptedException {
	AtomicReference<DisposableServer> conn = new AtomicReference<>();
	CountDownLatch startLatch = new CountDownLatch(1);

	Thread t = new Thread(() -> TcpServer.create()
	                                     .handle((in, out) -> out.sendString(Mono.just("foo")))
	                                     .bindUntilJavaShutdown(Duration.ofMillis(200),
	                                                            c -> {
	                                                                  conn.set(c);
	                                                                  startLatch.countDown();
	                                                            }));
	t.start();
	//let the server initialize
	startLatch.await();

	//check nothing happens for 200ms
	t.join(200);
	Assertions.assertThat(t.isAlive()).isTrue();

	//check that stopping the bnc stops the server
	conn.get().disposeNow();
	t.join();
	Assertions.assertThat(t.isAlive()).isFalse();
}
 
源代码3 项目: glowroot   文件: WebFluxIT.java
@Override
public void executeApp() throws Exception {
    int port = getAvailablePort();
    DisposableServer httpServer = HttpServer.create()
            .host("localhost")
            .port(port)
            .handle(new ReactorHttpHandlerAdapter(new MyHttpHandler()))
            .bind()
            .block();

    WebClient client = WebClient.create("http://localhost:" + port);
    client.get()
            .uri("/webflux/abc")
            .retrieve()
            .bodyToMono(String.class)
            .block();

    httpServer.dispose();
}
 
源代码4 项目: reactor-netty   文件: TcpClientTests.java
@Test
public void testReconnectWhenDisconnected() throws Exception {
	DisposableServer server =
			TcpServer.create()
			         .port(0)
			         .wiretap(true)
			         .handle((req, res) -> res.sendString(Mono.just("test")))
			         .bindNow();

	final CountDownLatch latch = new CountDownLatch(1);

	TcpClient  client =
			TcpClient.create()
			         .port(echoServerPort)
			         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100)
			         .handle((in, out) -> out.withConnection(Connection::dispose))
			         .wiretap(true);

	connect(client, true, latch);

	assertTrue(latch.await(30, TimeUnit.SECONDS));

	server.disposeNow();
}
 
源代码5 项目: reactor-netty   文件: BlockingConnectionTest.java
@Test
public void getContextAddressAndHost() {
	DisposableServer c = new TcpServer() {

		@Override
		public TcpServerConfig configuration() {
			return null;
		}

		@Override
		protected TcpServer duplicate() {
			return null;
		}

		@Override
		public Mono<? extends DisposableServer> bind() {
			return Mono.just(NEVER_STOP_SERVER);
		}
	}.bindNow();

	assertThat(c).isSameAs(NEVER_STOP_SERVER);
	assertThat(c.port()).isEqualTo(((InetSocketAddress) NEVER_STOP_CONTEXT.address()).getPort());
	assertThat(c.host()).isEqualTo(((InetSocketAddress) NEVER_STOP_CONTEXT.address()).getHostString());
}
 
源代码6 项目: reactor-netty   文件: HttpErrorTests.java
@Test
public void test() {
	DisposableServer server = HttpServer.create()
	                              .port(0)
	                              .route(httpServerRoutes -> httpServerRoutes.get(
			                                "/",
			                                (httpServerRequest, httpServerResponse) -> {
				                                return httpServerResponse.sendString(
						                                Mono.error(new IllegalArgumentException("test")));
			                                }))
	                                    .bindNow(Duration.ofSeconds(30));

	HttpClient client = HttpClient.create()
	                              .port(server.port());

	StepVerifier.create(client.get()
	                             .uri("/")
	                             .responseContent()
	                             .asString(StandardCharsets.UTF_8)
	                             .collectList())
	            .expectNextMatches(List::isEmpty)
	            .verifyComplete();

	server.disposeNow();
}
 
@Test
public void compressionActivatedOnClientAddsHeader() {
	AtomicReference<String> zip = new AtomicReference<>("fail");

	HttpServer server = HttpServer.create()
	                              .port(0)
	                              .compress(true);
	DisposableServer runningServer =
			server.handle((in, out) -> out.sendString(Mono.just("reply")))
			      .wiretap(true)
			      .bindNow(Duration.ofSeconds(10));
	HttpClient.create()
	          .remoteAddress(runningServer::address)
	          .wiretap(true)
	          .compress(true)
	          .headers(h -> zip.set(h.get("accept-encoding")))
	          .get()
	          .uri("/test")
	          .responseContent()
	          .blockLast();

	assertThat(zip.get()).isEqualTo("gzip");
	runningServer.dispose();
	runningServer.onDispose()
			.block();
}
 
@Test
public void testIssue282() {
	DisposableServer server =
			HttpServer.create()
			          .compress(2048)
			          .port(0)
			          .handle((req, res) -> res.sendString(Mono.just("testtesttesttesttest")))
			          .bindNow();

	Mono<String> response =
			HttpClient.create()
			          .port(server.port())
			          .get()
			          .uri("/")
			          .responseContent()
			          .aggregate()
			          .asString();

	StepVerifier.create(response)
	            .expectNextMatches("testtesttesttesttest"::equals)
	            .expectComplete()
	            .verify();

	server.disposeNow();
}
 
源代码9 项目: reactor-netty   文件: HttpServerTests.java
@Test
public void startRouterAndAwait() throws InterruptedException {
	ExecutorService ex = Executors.newSingleThreadExecutor();
	AtomicReference<DisposableServer> ref = new AtomicReference<>();

	Future<?> f = ex.submit(() ->
		    HttpServer.create()
		              .port(0)
		              .route(routes -> routes.get("/hello", (req, resp) -> resp.sendString(Mono.just("hello!"))))
		              .wiretap(true)
		              .bindUntilJavaShutdown(Duration.ofSeconds(2), ref::set)
	);

	//if the server cannot be started, a ExecutionException will be thrown instead
	assertThatExceptionOfType(TimeoutException.class)
			.isThrownBy(() -> f.get(1, TimeUnit.SECONDS));

	//the router is not done and is still blocking the thread
	assertThat(f.isDone()).isFalse();
	assertThat(ref.get()).withFailMessage("Server is not initialized after 1s").isNotNull();

	//shutdown the router to unblock the thread
	ref.get().disposeNow();
	Thread.sleep(100);
	assertThat(f.isDone()).isTrue();
}
 
源代码10 项目: reactor-netty   文件: HttpRedirectTest.java
private void redirectTests(String url) {
	AtomicInteger counter = new AtomicInteger(1);
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .handle((req, res) -> {
			              if (req.uri().contains("/login") &&
			                      req.method().equals(HttpMethod.POST) &&
			                      counter.getAndDecrement() > 0) {
			                  return res.sendRedirect(url);
			              }
			              else {
			                  return res.status(200)
			                            .send();
			              }
			          })
			          .wiretap(true)
			          .bindNow();

	ConnectionProvider pool = ConnectionProvider.create("redirectTests", 1);

	HttpClient client =
			HttpClient.create(pool)
			          .remoteAddress(server::address);

	try {
		Flux.range(0, 100)
		    .concatMap(i -> client.followRedirect(true)
		                          .post()
		                          .uri("/login")
		                          .responseContent()
		                          .then())
		    .blockLast(Duration.ofSeconds(30));
	}
	finally {
		server.disposeNow();
	}
}
 
源代码11 项目: reactor-netty   文件: HttpRedirectTest.java
/**
 * This ensures metrics and tracing get reliable timing. {@link HttpClient#doOnResponse} should
 * return when headers are received. Blocking client usage should therefore finish after that
 * timestamp.
 */
@Test
public void redirect_onResponseBeforeBlockCompletes() throws Exception {
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .host("localhost")
			          .wiretap(true)
			          .route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
			                       .get("/3", (req, res) -> res.status(200)
			                                                   .sendString(Mono.just("OK"))))
			          .bindNow();

	BlockingQueue<Long> doOnResponseNanos = new LinkedBlockingQueue<>();
	Long responseNanos =
			HttpClient.create()
			          .remoteAddress(server::address)
			          .wiretap(true)
			          .followRedirect(true)
			          .doOnResponse((r, c) -> doOnResponseNanos.add(System.nanoTime()))
			          .get()
			          .uri("/1")
			          .response().map(r -> System.nanoTime())
			          .block(Duration.ofSeconds(30));

	assertThat(responseNanos).isGreaterThan(doOnResponseNanos.poll(5, TimeUnit.SECONDS));

	server.disposeNow();
}
 
源代码12 项目: reactor-netty   文件: TcpServerBind.java
@Override
public Mono<? extends DisposableServer> bind() {
	if (config.sslProvider != null && config.sslProvider.getDefaultConfigurationType() == null) {
		config.sslProvider = SslProvider.updateDefaultConfiguration(config.sslProvider, SslProvider.DefaultConfigurationType.TCP);
	}
	return super.bind();
}
 
源代码13 项目: reactor-netty   文件: HttpServerBind.java
@Override
public Mono<? extends DisposableServer> bind() {
	if (config.sslProvider != null) {
		if (config.sslProvider.getDefaultConfigurationType() == null) {
			HttpServer dup = duplicate();
			HttpServerConfig _config = dup.configuration();
			if ((_config._protocols & HttpServerConfig.h2) == HttpServerConfig.h2) {
				_config.sslProvider = SslProvider.updateDefaultConfiguration(_config.sslProvider,
						SslProvider.DefaultConfigurationType.H2);
			}
			else {
				_config.sslProvider = SslProvider.updateDefaultConfiguration(_config.sslProvider,
						SslProvider.DefaultConfigurationType.TCP);
			}
			return dup.bind();
		}
		if ((config._protocols & HttpServerConfig.h2c) == HttpServerConfig.h2c) {
			return Mono.error(new IllegalArgumentException(
					"Configured H2 Clear-Text protocol with TLS. " +
							"Use the non Clear-Text H2 protocol via " +
							"HttpServer#protocol or disable TLS via HttpServer#noSSL())"));
		}
	}
	else {
		if ((config._protocols & HttpServerConfig.h2) == HttpServerConfig.h2) {
			return Mono.error(new IllegalArgumentException(
					"Configured H2 protocol without TLS. " +
							"Use a Clear-Text H2 protocol via HttpServer#protocol or configure TLS " +
							"via HttpServer#secure"));
		}
	}
	return super.bind();
}
 
源代码14 项目: reactor-netty   文件: HttpRedirectTest.java
@Test
public void testHttp2Redirect() throws Exception {
	SelfSignedCertificate cert = new SelfSignedCertificate();
	SslContextBuilder serverCtx = SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
	SslContextBuilder clientCtx = SslContextBuilder.forClient()
	                                               .trustManager(InsecureTrustManagerFactory.INSTANCE);
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .host("localhost")
			          .wiretap(true)
			          .protocol(HttpProtocol.H2)
			          .secure(spec -> spec.sslContext(serverCtx))
			          .route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
			                       .get("/3", (req, res) -> res.status(200)
			                                                   .sendString(Mono.just("OK"))))
			          .wiretap(true)
			          .bindNow();

	Tuple2<String, Integer> response =
			HttpClient.create()
			          .remoteAddress(server::address)
			          .wiretap(true)
			          .followRedirect(true)
			          .protocol(HttpProtocol.H2)
			          .secure(spec -> spec.sslContext(clientCtx))
			          .get()
			          .uri("/1")
			          .responseSingle((res, bytes) -> bytes.asString()
			                                               .zipWith(Mono.just(res.status().code())))
			          .block(Duration.ofSeconds(30));

	assertThat(response).isNotNull();
	assertThat(response.getT2()).isEqualTo(200);
	assertThat(response.getT1()).isEqualTo("OK");

	server.disposeNow();
}
 
源代码15 项目: reactor-netty   文件: ServerTransportConfig.java
@Override
public void onStateChange(Connection connection, State newState) {
	if (newState == State.CONNECTED) {
		if (doOnBound != null) {
			doOnBound.accept((DisposableServer) connection);
		}
		if (doOnUnbound != null) {
			connection.channel()
			          .closeFuture()
			          .addListener(f -> doOnUnbound.accept((DisposableServer) connection));
		}
	}
}
 
private void failOnClientServerError(
		int serverStatus, String serverSubprotocol, String clientSubprotocol) {
	DisposableServer httpServer =
			HttpServer.create()
			          .port(0)
			          .route(routes ->
			              routes.post("/login", (req, res) -> res.status(serverStatus).sendHeaders())
			                    .get("/ws", (req, res) -> {
			                        int token = Integer.parseInt(req.requestHeaders().get("Authorization"));
			                        if (token >= 400) {
			                            return res.status(token).send();
			                        }
			                        return res.sendWebsocket((i, o) -> o.sendString(Mono.just("test")),
			                                WebsocketServerSpec.builder().protocols(serverSubprotocol).build());
			                    }))
			          .wiretap(true)
			          .bindNow();

	Flux<String> response =
			HttpClient.create()
			          .port(httpServer.port())
			          .wiretap(true)
			          .headersWhen(h -> login(httpServer.port()).map(token -> h.set("Authorization", token)))
			          .websocket(WebsocketClientSpec.builder().protocols(clientSubprotocol).build())
			          .uri("/ws")
			          .handle((i, o) -> i.receive().asString())
			          .log()
			          .switchIfEmpty(Mono.error(new Exception()));

	StepVerifier.create(response)
	            .expectError(WebSocketHandshakeException.class)
	            .verify();

	httpServer.disposeNow();
}
 
源代码17 项目: reactor-netty   文件: ServerTransport.java
/**
 * Start the server in a blocking fashion, and wait for it to finish initializing
 * or the provided startup timeout expires. The returned {@link DisposableServer}
 * offers simple server API, including to {@link DisposableServer#disposeNow()}
 * shut it down in a blocking fashion.
 *
 * @param timeout max startup timeout
 * @return a {@link DisposableServer}
 */
public final DisposableServer bindNow(Duration timeout) {
	Objects.requireNonNull(timeout, "timeout");
	try {
		return Objects.requireNonNull(bind().block(timeout), "aborted");
	}
	catch (IllegalStateException e) {
		if (e.getMessage()
		     .contains("blocking read")) {
			throw new IllegalStateException(getClass().getSimpleName() + " couldn't be started within " + timeout.toMillis() + "ms");
		}
		throw e;
	}
}
 
源代码18 项目: reactor-netty   文件: ServerTransport.java
/**
 * Start the server in a fully blocking fashion, not only waiting for it to initialize
 * but also blocking during the full lifecycle of the server. Since most
 * servers will be long-lived, this is more adapted to running a server out of a main
 * method, only allowing shutdown of the servers through {@code sigkill}.
 * <p>
 * Note: {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added by
 * this method in order to properly disconnect the server upon receiving a
 * {@code sigkill} signal.</p>
 *
 * @param timeout a timeout for server shutdown
 * @param onStart an optional callback on server start
 */
public final void bindUntilJavaShutdown(Duration timeout, @Nullable Consumer<DisposableServer> onStart) {
	Objects.requireNonNull(timeout, "timeout");
	DisposableServer facade = Objects.requireNonNull(bindNow(), "facade");

	if (onStart != null) {
		onStart.accept(facade);
	}

	Runtime.getRuntime()
	       .addShutdownHook(new Thread(() -> facade.disposeNow(timeout)));

	facade.onDispose()
	      .block();
}
 
源代码19 项目: reactor-netty   文件: HttpRedirectTest.java
@Test
public void testIssue606() {
	final int serverPort = SocketUtils.findAvailableTcpPort();

	DisposableServer server =
			HttpServer.create()
			          .port(serverPort)
			          .host("localhost")
			          .handle((req, res) -> res.sendRedirect("http://localhost:" + serverPort))
			          .wiretap(true)
			          .bindNow();

	AtomicInteger followRedirects = new AtomicInteger(0);
	HttpClient.create()
	          .remoteAddress(server::address)
	          .wiretap(true)
	          .followRedirect((req, res) -> {
	              boolean result = req.redirectedFrom().length < 4;
	              if (result) {
	                  followRedirects.getAndIncrement();
	              }
	              return result;
	          })
	          .get()
	          .uri("/")
	          .responseContent()
	          .blockLast();

	server.disposeNow();

	assertThat(followRedirects.get()).isEqualTo(4);
}
 
@Test
public void testIssue1012() throws Exception {
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .wiretap(true)
			          .route(r -> r.get("/1", (req, resp) -> resp.sendString(Mono.just("testIssue1012")))
			                       .get("/2", (req, res) -> Mono.error(new RuntimeException("testIssue1012"))))
			          .bindNow();

	DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) ConnectionProvider.create("testIssue1012", 1);
	CountDownLatch latch = new CountDownLatch(1);
	HttpClient client =
			HttpClient.create(provider)
			          .port(server.port())
			          .wiretap(true)
			          .doOnConnected(conn -> conn.channel().closeFuture().addListener(f -> latch.countDown()));

	client.get()
	      .uri("/1")
	      .responseContent()
	      .aggregate()
	      .block(Duration.ofSeconds(30));

	client.get()
	      .uri("/2")
	      .responseContent()
	      .aggregate()
	      .onErrorResume(e -> Mono.empty())
	      .block(Duration.ofSeconds(30));

	assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

	provider.channelPools.forEach((k, v) -> assertThat(v.metrics().acquiredSize()).isEqualTo(0));

	provider.disposeLater()
			.block(Duration.ofSeconds(30));
	server.disposeNow();
}
 
@Test
public void testSslEngineClosed() throws Exception {
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .wiretap(true)
			          .handle((req, res) -> res.sendString(Mono.just("test")))
			          .bindNow();
	SslContext ctx = SslContextBuilder.forClient()
	                                  .sslProvider(SslProvider.JDK)
	                                  .build();
	HttpClient client =
			HttpClient.create()
			          .port(server.port())
			          .secure(spec -> spec.sslContext(ctx))
			          .wiretap(true);

	// Connection close happens after `Channel connected`
	// Re-acquiring is not possible
	// The SSLException will be propagated
	doTestSslEngineClosed(client, new AtomicInteger(0), SSLException.class, "SSLEngine is closing/closed");

	// Connection close happens between `Initialized pipeline` and `Channel connected`
	// Re-acquiring
	// Connection close happens after `Channel connected`
	// The SSLException will be propagated, Reactor Netty re-acquire only once
	doTestSslEngineClosed(client, new AtomicInteger(1), SSLException.class, "SSLEngine is closing/closed");

	// Connection close happens between `Initialized pipeline` and `Channel connected`
	// Re-acquiring
	// Connection close happens between `Initialized pipeline` and `Channel connected`
	// The IOException will be propagated, Reactor Netty re-acquire only once
	doTestSslEngineClosed(client, new AtomicInteger(2), IOException.class, "Error while acquiring from");

	server.disposeNow();
}
 
源代码22 项目: tutorials   文件: SpringSecurity5Application.java
@Bean
public DisposableServer disposableServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context)
            .build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create();
    httpServer.host("localhost");
    httpServer.port(8080);
    return httpServer.handle(adapter).bindNow();
}
 
源代码23 项目: reactor-netty   文件: SslProviderTests.java
@Test
public void testSslConfigurationProtocolHttp11_2() {
	DisposableServer disposableServer =
			server.protocol(HttpProtocol.H2)
			      .secure(spec -> spec.sslContext(builder))
			      .protocol(HttpProtocol.HTTP11)
			      .bindNow();
	assertTrue(protocols.isEmpty());
	assertTrue(OpenSsl.isAvailable() ? sslContext instanceof OpenSslContext :
	                                   sslContext instanceof JdkSslContext);
	disposableServer.disposeNow();
}
 
源代码24 项目: reactor-netty   文件: SslProviderTests.java
@Test
public void testProtocolH2SslConfiguration() {
	DisposableServer disposableServer =
			server.protocol(HttpProtocol.H2)
			      .secure(spec -> spec.sslContext(builder))
			      .bindNow();
	assertEquals(2, protocols.size());
	assertTrue(protocols.contains("h2"));
	assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
	                                       sslContext instanceof OpenSslContext :
	                                       sslContext instanceof JdkSslContext);
	disposableServer.disposeNow();
}
 
源代码25 项目: reactor-netty   文件: SslProviderTests.java
@Test
public void testSslConfigurationProtocolH2_1() {
	DisposableServer disposableServer =
			server.secure(spec -> spec.sslContext(builder))
			      .protocol(HttpProtocol.H2)
			      .bindNow();
	assertEquals(2, protocols.size());
	assertTrue(protocols.contains("h2"));
	assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
	                                       sslContext instanceof OpenSslContext :
	                                       sslContext instanceof JdkSslContext);
	disposableServer.disposeNow();
}
 
源代码26 项目: reactor-netty   文件: SslProviderTests.java
@Test
public void testSslConfigurationProtocolH2_2() {
	DisposableServer disposableServer =
			server.protocol(HttpProtocol.HTTP11)
			      .secure(spec -> spec.sslContext(builder))
			      .protocol(HttpProtocol.H2)
			      .bindNow();
	assertEquals(2, protocols.size());
	assertTrue(protocols.contains("h2"));
	assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
	                                       sslContext instanceof OpenSslContext :
	                                       sslContext instanceof JdkSslContext);
	disposableServer.disposeNow();
}
 
源代码27 项目: reactor-netty   文件: TcpServerTests.java
@Test(timeout = 10000)
public void testHang() {
	DisposableServer httpServer =
			HttpServer.create()
			          .port(0)
			          .host("0.0.0.0")
			          .route(r -> r.get("/data", (request, response) -> response.send(Mono.empty())))
			          .wiretap(true)
			          .bindNow();

	assertNotNull(httpServer);

	httpServer.disposeNow();
}
 
源代码28 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void exposesRemoteAddress() throws InterruptedException {
	final int port = SocketUtils.findAvailableTcpPort();
	final CountDownLatch latch = new CountDownLatch(1);

	DisposableServer server = TcpServer.create()
	                                   .port(port)
	                                   .handle((in, out) -> {

	    in.withConnection(c -> {
	        InetSocketAddress addr = (InetSocketAddress) c.address();
	        assertNotNull("remote address is not null", addr.getAddress());
	        latch.countDown();
	    });

		return Flux.never();
	                                   })
	                                   .wiretap(true)
	                                   .bindNow();

	assertNotNull(server);

	Connection client = TcpClient.create().port(port)
	                             .handle((in, out) -> out.sendString(Flux.just("Hello World!")))
	                             .wiretap(true)
	                             .connectNow();

	assertNotNull(client);

	assertTrue("Latch was counted down", latch.await(5, TimeUnit.SECONDS));

	client.disposeNow();
	server.disposeNow();
}
 
源代码29 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void testIssue462() throws InterruptedException {

	final CountDownLatch countDownLatch = new CountDownLatch(1);

	DisposableServer server = TcpServer.create()
	                                   .port(0)
	                                   .handle((in, out) -> {
	                                       in.receive()
	                                         .log("channel")
	                                         .subscribe(trip -> countDownLatch.countDown());
	                                       return Flux.never();
	                                   })
	                                   .wiretap(true)
	                                   .bindNow();

	assertNotNull(server);

	System.out.println("PORT +" + server.port());

	Connection client = TcpClient.create()
	                             .port(server.port())
	                             .handle((in, out) -> out.sendString(Flux.just("test")))
	                             .wiretap(true)
	                             .connectNow();

	assertNotNull(client);

	assertThat("Latch was counted down", countDownLatch.await(5, TimeUnit.SECONDS));

	client.disposeNow();
	server.disposeNow();
}
 
源代码30 项目: reactor-netty   文件: TcpServerTests.java
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
	MonoProcessor<Void> serverConnDisposed = MonoProcessor.create();

	ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());

	CountDownLatch latch = new CountDownLatch(1);

	DisposableServer boundServer =
			TcpServer.create()
			         .port(0)
			         .doOnConnection(c -> {
			             c.onDispose()
			              .subscribe(serverConnDisposed);
			             group.add(c.channel());
			             latch.countDown();
			         })
			         .wiretap(true)
			         .bindNow();

	TcpClient.create()
	         .remoteAddress(boundServer::address)
	         .wiretap(true)
	         .connect()
	         .subscribe();

	assertTrue(latch.await(30, TimeUnit.SECONDS));

	boundServer.disposeNow();

	FutureMono.from(group.close())
	          .block(Duration.ofSeconds(30));

	serverConnDisposed.block(Duration.ofSeconds(5));
}