下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#reactor.netty.DisposableServer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String... args) {
long start = System.currentTimeMillis();
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routes(
new BCryptPasswordEncoder(18)
));
ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
DisposableServer server = HttpServer.create()
.host("localhost")
.port(8080)
.handle(reactorHttpHandler)
.bindNow();
LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms");
server.onDispose()
.block();
}
@Test(timeout = 2000)
public void startAndAwait() throws InterruptedException {
AtomicReference<DisposableServer> conn = new AtomicReference<>();
CountDownLatch startLatch = new CountDownLatch(1);
Thread t = new Thread(() -> TcpServer.create()
.handle((in, out) -> out.sendString(Mono.just("foo")))
.bindUntilJavaShutdown(Duration.ofMillis(200),
c -> {
conn.set(c);
startLatch.countDown();
}));
t.start();
//let the server initialize
startLatch.await();
//check nothing happens for 200ms
t.join(200);
Assertions.assertThat(t.isAlive()).isTrue();
//check that stopping the bnc stops the server
conn.get().disposeNow();
t.join();
Assertions.assertThat(t.isAlive()).isFalse();
}
@Override
public void executeApp() throws Exception {
int port = getAvailablePort();
DisposableServer httpServer = HttpServer.create()
.host("localhost")
.port(port)
.handle(new ReactorHttpHandlerAdapter(new MyHttpHandler()))
.bind()
.block();
WebClient client = WebClient.create("http://localhost:" + port);
client.get()
.uri("/webflux/abc")
.retrieve()
.bodyToMono(String.class)
.block();
httpServer.dispose();
}
@Test
public void testReconnectWhenDisconnected() throws Exception {
DisposableServer server =
TcpServer.create()
.port(0)
.wiretap(true)
.handle((req, res) -> res.sendString(Mono.just("test")))
.bindNow();
final CountDownLatch latch = new CountDownLatch(1);
TcpClient client =
TcpClient.create()
.port(echoServerPort)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100)
.handle((in, out) -> out.withConnection(Connection::dispose))
.wiretap(true);
connect(client, true, latch);
assertTrue(latch.await(30, TimeUnit.SECONDS));
server.disposeNow();
}
@Test
public void getContextAddressAndHost() {
DisposableServer c = new TcpServer() {
@Override
public TcpServerConfig configuration() {
return null;
}
@Override
protected TcpServer duplicate() {
return null;
}
@Override
public Mono<? extends DisposableServer> bind() {
return Mono.just(NEVER_STOP_SERVER);
}
}.bindNow();
assertThat(c).isSameAs(NEVER_STOP_SERVER);
assertThat(c.port()).isEqualTo(((InetSocketAddress) NEVER_STOP_CONTEXT.address()).getPort());
assertThat(c.host()).isEqualTo(((InetSocketAddress) NEVER_STOP_CONTEXT.address()).getHostString());
}
@Test
public void test() {
DisposableServer server = HttpServer.create()
.port(0)
.route(httpServerRoutes -> httpServerRoutes.get(
"/",
(httpServerRequest, httpServerResponse) -> {
return httpServerResponse.sendString(
Mono.error(new IllegalArgumentException("test")));
}))
.bindNow(Duration.ofSeconds(30));
HttpClient client = HttpClient.create()
.port(server.port());
StepVerifier.create(client.get()
.uri("/")
.responseContent()
.asString(StandardCharsets.UTF_8)
.collectList())
.expectNextMatches(List::isEmpty)
.verifyComplete();
server.disposeNow();
}
@Test
public void compressionActivatedOnClientAddsHeader() {
AtomicReference<String> zip = new AtomicReference<>("fail");
HttpServer server = HttpServer.create()
.port(0)
.compress(true);
DisposableServer runningServer =
server.handle((in, out) -> out.sendString(Mono.just("reply")))
.wiretap(true)
.bindNow(Duration.ofSeconds(10));
HttpClient.create()
.remoteAddress(runningServer::address)
.wiretap(true)
.compress(true)
.headers(h -> zip.set(h.get("accept-encoding")))
.get()
.uri("/test")
.responseContent()
.blockLast();
assertThat(zip.get()).isEqualTo("gzip");
runningServer.dispose();
runningServer.onDispose()
.block();
}
@Test
public void testIssue282() {
DisposableServer server =
HttpServer.create()
.compress(2048)
.port(0)
.handle((req, res) -> res.sendString(Mono.just("testtesttesttesttest")))
.bindNow();
Mono<String> response =
HttpClient.create()
.port(server.port())
.get()
.uri("/")
.responseContent()
.aggregate()
.asString();
StepVerifier.create(response)
.expectNextMatches("testtesttesttesttest"::equals)
.expectComplete()
.verify();
server.disposeNow();
}
@Test
public void startRouterAndAwait() throws InterruptedException {
ExecutorService ex = Executors.newSingleThreadExecutor();
AtomicReference<DisposableServer> ref = new AtomicReference<>();
Future<?> f = ex.submit(() ->
HttpServer.create()
.port(0)
.route(routes -> routes.get("/hello", (req, resp) -> resp.sendString(Mono.just("hello!"))))
.wiretap(true)
.bindUntilJavaShutdown(Duration.ofSeconds(2), ref::set)
);
//if the server cannot be started, a ExecutionException will be thrown instead
assertThatExceptionOfType(TimeoutException.class)
.isThrownBy(() -> f.get(1, TimeUnit.SECONDS));
//the router is not done and is still blocking the thread
assertThat(f.isDone()).isFalse();
assertThat(ref.get()).withFailMessage("Server is not initialized after 1s").isNotNull();
//shutdown the router to unblock the thread
ref.get().disposeNow();
Thread.sleep(100);
assertThat(f.isDone()).isTrue();
}
private void redirectTests(String url) {
AtomicInteger counter = new AtomicInteger(1);
DisposableServer server =
HttpServer.create()
.port(0)
.handle((req, res) -> {
if (req.uri().contains("/login") &&
req.method().equals(HttpMethod.POST) &&
counter.getAndDecrement() > 0) {
return res.sendRedirect(url);
}
else {
return res.status(200)
.send();
}
})
.wiretap(true)
.bindNow();
ConnectionProvider pool = ConnectionProvider.create("redirectTests", 1);
HttpClient client =
HttpClient.create(pool)
.remoteAddress(server::address);
try {
Flux.range(0, 100)
.concatMap(i -> client.followRedirect(true)
.post()
.uri("/login")
.responseContent()
.then())
.blockLast(Duration.ofSeconds(30));
}
finally {
server.disposeNow();
}
}
/**
* This ensures metrics and tracing get reliable timing. {@link HttpClient#doOnResponse} should
* return when headers are received. Blocking client usage should therefore finish after that
* timestamp.
*/
@Test
public void redirect_onResponseBeforeBlockCompletes() throws Exception {
DisposableServer server =
HttpServer.create()
.port(0)
.host("localhost")
.wiretap(true)
.route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
.get("/3", (req, res) -> res.status(200)
.sendString(Mono.just("OK"))))
.bindNow();
BlockingQueue<Long> doOnResponseNanos = new LinkedBlockingQueue<>();
Long responseNanos =
HttpClient.create()
.remoteAddress(server::address)
.wiretap(true)
.followRedirect(true)
.doOnResponse((r, c) -> doOnResponseNanos.add(System.nanoTime()))
.get()
.uri("/1")
.response().map(r -> System.nanoTime())
.block(Duration.ofSeconds(30));
assertThat(responseNanos).isGreaterThan(doOnResponseNanos.poll(5, TimeUnit.SECONDS));
server.disposeNow();
}
@Override
public Mono<? extends DisposableServer> bind() {
if (config.sslProvider != null && config.sslProvider.getDefaultConfigurationType() == null) {
config.sslProvider = SslProvider.updateDefaultConfiguration(config.sslProvider, SslProvider.DefaultConfigurationType.TCP);
}
return super.bind();
}
@Override
public Mono<? extends DisposableServer> bind() {
if (config.sslProvider != null) {
if (config.sslProvider.getDefaultConfigurationType() == null) {
HttpServer dup = duplicate();
HttpServerConfig _config = dup.configuration();
if ((_config._protocols & HttpServerConfig.h2) == HttpServerConfig.h2) {
_config.sslProvider = SslProvider.updateDefaultConfiguration(_config.sslProvider,
SslProvider.DefaultConfigurationType.H2);
}
else {
_config.sslProvider = SslProvider.updateDefaultConfiguration(_config.sslProvider,
SslProvider.DefaultConfigurationType.TCP);
}
return dup.bind();
}
if ((config._protocols & HttpServerConfig.h2c) == HttpServerConfig.h2c) {
return Mono.error(new IllegalArgumentException(
"Configured H2 Clear-Text protocol with TLS. " +
"Use the non Clear-Text H2 protocol via " +
"HttpServer#protocol or disable TLS via HttpServer#noSSL())"));
}
}
else {
if ((config._protocols & HttpServerConfig.h2) == HttpServerConfig.h2) {
return Mono.error(new IllegalArgumentException(
"Configured H2 protocol without TLS. " +
"Use a Clear-Text H2 protocol via HttpServer#protocol or configure TLS " +
"via HttpServer#secure"));
}
}
return super.bind();
}
@Test
public void testHttp2Redirect() throws Exception {
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContextBuilder serverCtx = SslContextBuilder.forServer(cert.certificate(), cert.privateKey());
SslContextBuilder clientCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE);
DisposableServer server =
HttpServer.create()
.port(0)
.host("localhost")
.wiretap(true)
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(serverCtx))
.route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
.get("/3", (req, res) -> res.status(200)
.sendString(Mono.just("OK"))))
.wiretap(true)
.bindNow();
Tuple2<String, Integer> response =
HttpClient.create()
.remoteAddress(server::address)
.wiretap(true)
.followRedirect(true)
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(clientCtx))
.get()
.uri("/1")
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.status().code())))
.block(Duration.ofSeconds(30));
assertThat(response).isNotNull();
assertThat(response.getT2()).isEqualTo(200);
assertThat(response.getT1()).isEqualTo("OK");
server.disposeNow();
}
@Override
public void onStateChange(Connection connection, State newState) {
if (newState == State.CONNECTED) {
if (doOnBound != null) {
doOnBound.accept((DisposableServer) connection);
}
if (doOnUnbound != null) {
connection.channel()
.closeFuture()
.addListener(f -> doOnUnbound.accept((DisposableServer) connection));
}
}
}
private void failOnClientServerError(
int serverStatus, String serverSubprotocol, String clientSubprotocol) {
DisposableServer httpServer =
HttpServer.create()
.port(0)
.route(routes ->
routes.post("/login", (req, res) -> res.status(serverStatus).sendHeaders())
.get("/ws", (req, res) -> {
int token = Integer.parseInt(req.requestHeaders().get("Authorization"));
if (token >= 400) {
return res.status(token).send();
}
return res.sendWebsocket((i, o) -> o.sendString(Mono.just("test")),
WebsocketServerSpec.builder().protocols(serverSubprotocol).build());
}))
.wiretap(true)
.bindNow();
Flux<String> response =
HttpClient.create()
.port(httpServer.port())
.wiretap(true)
.headersWhen(h -> login(httpServer.port()).map(token -> h.set("Authorization", token)))
.websocket(WebsocketClientSpec.builder().protocols(clientSubprotocol).build())
.uri("/ws")
.handle((i, o) -> i.receive().asString())
.log()
.switchIfEmpty(Mono.error(new Exception()));
StepVerifier.create(response)
.expectError(WebSocketHandshakeException.class)
.verify();
httpServer.disposeNow();
}
/**
* Start the server in a blocking fashion, and wait for it to finish initializing
* or the provided startup timeout expires. The returned {@link DisposableServer}
* offers simple server API, including to {@link DisposableServer#disposeNow()}
* shut it down in a blocking fashion.
*
* @param timeout max startup timeout
* @return a {@link DisposableServer}
*/
public final DisposableServer bindNow(Duration timeout) {
Objects.requireNonNull(timeout, "timeout");
try {
return Objects.requireNonNull(bind().block(timeout), "aborted");
}
catch (IllegalStateException e) {
if (e.getMessage()
.contains("blocking read")) {
throw new IllegalStateException(getClass().getSimpleName() + " couldn't be started within " + timeout.toMillis() + "ms");
}
throw e;
}
}
/**
* Start the server in a fully blocking fashion, not only waiting for it to initialize
* but also blocking during the full lifecycle of the server. Since most
* servers will be long-lived, this is more adapted to running a server out of a main
* method, only allowing shutdown of the servers through {@code sigkill}.
* <p>
* Note: {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added by
* this method in order to properly disconnect the server upon receiving a
* {@code sigkill} signal.</p>
*
* @param timeout a timeout for server shutdown
* @param onStart an optional callback on server start
*/
public final void bindUntilJavaShutdown(Duration timeout, @Nullable Consumer<DisposableServer> onStart) {
Objects.requireNonNull(timeout, "timeout");
DisposableServer facade = Objects.requireNonNull(bindNow(), "facade");
if (onStart != null) {
onStart.accept(facade);
}
Runtime.getRuntime()
.addShutdownHook(new Thread(() -> facade.disposeNow(timeout)));
facade.onDispose()
.block();
}
@Test
public void testIssue606() {
final int serverPort = SocketUtils.findAvailableTcpPort();
DisposableServer server =
HttpServer.create()
.port(serverPort)
.host("localhost")
.handle((req, res) -> res.sendRedirect("http://localhost:" + serverPort))
.wiretap(true)
.bindNow();
AtomicInteger followRedirects = new AtomicInteger(0);
HttpClient.create()
.remoteAddress(server::address)
.wiretap(true)
.followRedirect((req, res) -> {
boolean result = req.redirectedFrom().length < 4;
if (result) {
followRedirects.getAndIncrement();
}
return result;
})
.get()
.uri("/")
.responseContent()
.blockLast();
server.disposeNow();
assertThat(followRedirects.get()).isEqualTo(4);
}
@Test
public void testIssue1012() throws Exception {
DisposableServer server =
HttpServer.create()
.port(0)
.wiretap(true)
.route(r -> r.get("/1", (req, resp) -> resp.sendString(Mono.just("testIssue1012")))
.get("/2", (req, res) -> Mono.error(new RuntimeException("testIssue1012"))))
.bindNow();
DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) ConnectionProvider.create("testIssue1012", 1);
CountDownLatch latch = new CountDownLatch(1);
HttpClient client =
HttpClient.create(provider)
.port(server.port())
.wiretap(true)
.doOnConnected(conn -> conn.channel().closeFuture().addListener(f -> latch.countDown()));
client.get()
.uri("/1")
.responseContent()
.aggregate()
.block(Duration.ofSeconds(30));
client.get()
.uri("/2")
.responseContent()
.aggregate()
.onErrorResume(e -> Mono.empty())
.block(Duration.ofSeconds(30));
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
provider.channelPools.forEach((k, v) -> assertThat(v.metrics().acquiredSize()).isEqualTo(0));
provider.disposeLater()
.block(Duration.ofSeconds(30));
server.disposeNow();
}
@Test
public void testSslEngineClosed() throws Exception {
DisposableServer server =
HttpServer.create()
.port(0)
.wiretap(true)
.handle((req, res) -> res.sendString(Mono.just("test")))
.bindNow();
SslContext ctx = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.build();
HttpClient client =
HttpClient.create()
.port(server.port())
.secure(spec -> spec.sslContext(ctx))
.wiretap(true);
// Connection close happens after `Channel connected`
// Re-acquiring is not possible
// The SSLException will be propagated
doTestSslEngineClosed(client, new AtomicInteger(0), SSLException.class, "SSLEngine is closing/closed");
// Connection close happens between `Initialized pipeline` and `Channel connected`
// Re-acquiring
// Connection close happens after `Channel connected`
// The SSLException will be propagated, Reactor Netty re-acquire only once
doTestSslEngineClosed(client, new AtomicInteger(1), SSLException.class, "SSLEngine is closing/closed");
// Connection close happens between `Initialized pipeline` and `Channel connected`
// Re-acquiring
// Connection close happens between `Initialized pipeline` and `Channel connected`
// The IOException will be propagated, Reactor Netty re-acquire only once
doTestSslEngineClosed(client, new AtomicInteger(2), IOException.class, "Error while acquiring from");
server.disposeNow();
}
@Bean
public DisposableServer disposableServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context)
.build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create();
httpServer.host("localhost");
httpServer.port(8080);
return httpServer.handle(adapter).bindNow();
}
@Test
public void testSslConfigurationProtocolHttp11_2() {
DisposableServer disposableServer =
server.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(builder))
.protocol(HttpProtocol.HTTP11)
.bindNow();
assertTrue(protocols.isEmpty());
assertTrue(OpenSsl.isAvailable() ? sslContext instanceof OpenSslContext :
sslContext instanceof JdkSslContext);
disposableServer.disposeNow();
}
@Test
public void testProtocolH2SslConfiguration() {
DisposableServer disposableServer =
server.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(builder))
.bindNow();
assertEquals(2, protocols.size());
assertTrue(protocols.contains("h2"));
assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
sslContext instanceof OpenSslContext :
sslContext instanceof JdkSslContext);
disposableServer.disposeNow();
}
@Test
public void testSslConfigurationProtocolH2_1() {
DisposableServer disposableServer =
server.secure(spec -> spec.sslContext(builder))
.protocol(HttpProtocol.H2)
.bindNow();
assertEquals(2, protocols.size());
assertTrue(protocols.contains("h2"));
assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
sslContext instanceof OpenSslContext :
sslContext instanceof JdkSslContext);
disposableServer.disposeNow();
}
@Test
public void testSslConfigurationProtocolH2_2() {
DisposableServer disposableServer =
server.protocol(HttpProtocol.HTTP11)
.secure(spec -> spec.sslContext(builder))
.protocol(HttpProtocol.H2)
.bindNow();
assertEquals(2, protocols.size());
assertTrue(protocols.contains("h2"));
assertTrue(io.netty.handler.ssl.SslProvider.isAlpnSupported(io.netty.handler.ssl.SslProvider.OPENSSL) ?
sslContext instanceof OpenSslContext :
sslContext instanceof JdkSslContext);
disposableServer.disposeNow();
}
@Test(timeout = 10000)
public void testHang() {
DisposableServer httpServer =
HttpServer.create()
.port(0)
.host("0.0.0.0")
.route(r -> r.get("/data", (request, response) -> response.send(Mono.empty())))
.wiretap(true)
.bindNow();
assertNotNull(httpServer);
httpServer.disposeNow();
}
@Test
public void exposesRemoteAddress() throws InterruptedException {
final int port = SocketUtils.findAvailableTcpPort();
final CountDownLatch latch = new CountDownLatch(1);
DisposableServer server = TcpServer.create()
.port(port)
.handle((in, out) -> {
in.withConnection(c -> {
InetSocketAddress addr = (InetSocketAddress) c.address();
assertNotNull("remote address is not null", addr.getAddress());
latch.countDown();
});
return Flux.never();
})
.wiretap(true)
.bindNow();
assertNotNull(server);
Connection client = TcpClient.create().port(port)
.handle((in, out) -> out.sendString(Flux.just("Hello World!")))
.wiretap(true)
.connectNow();
assertNotNull(client);
assertTrue("Latch was counted down", latch.await(5, TimeUnit.SECONDS));
client.disposeNow();
server.disposeNow();
}
@Test
public void testIssue462() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DisposableServer server = TcpServer.create()
.port(0)
.handle((in, out) -> {
in.receive()
.log("channel")
.subscribe(trip -> countDownLatch.countDown());
return Flux.never();
})
.wiretap(true)
.bindNow();
assertNotNull(server);
System.out.println("PORT +" + server.port());
Connection client = TcpClient.create()
.port(server.port())
.handle((in, out) -> out.sendString(Flux.just("test")))
.wiretap(true)
.connectNow();
assertNotNull(client);
assertThat("Latch was counted down", countDownLatch.await(5, TimeUnit.SECONDS));
client.disposeNow();
server.disposeNow();
}
@Test
public void testChannelGroupClosesAllConnections() throws Exception {
MonoProcessor<Void> serverConnDisposed = MonoProcessor.create();
ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());
CountDownLatch latch = new CountDownLatch(1);
DisposableServer boundServer =
TcpServer.create()
.port(0)
.doOnConnection(c -> {
c.onDispose()
.subscribe(serverConnDisposed);
group.add(c.channel());
latch.countDown();
})
.wiretap(true)
.bindNow();
TcpClient.create()
.remoteAddress(boundServer::address)
.wiretap(true)
.connect()
.subscribe();
assertTrue(latch.await(30, TimeUnit.SECONDS));
boundServer.disposeNow();
FutureMono.from(group.close())
.block(Duration.ofSeconds(30));
serverConnDisposed.block(Duration.ofSeconds(5));
}