下面列出了io.netty.channel.ChannelId#reactor.netty.http.server.HttpServer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private HttpServer createHttpServer() {
HttpServer server = HttpServer.create();
if (this.resourceFactory != null) {
LoopResources resources = this.resourceFactory.getLoopResources();
Assert.notNull(resources,
"No LoopResources: is ReactorResourceFactory not initialized yet?");
server = server.tcpConfiguration((tcpServer) -> tcpServer.runOn(resources)
.addressSupplier(this::getListenAddress));
}
else {
server = server.tcpConfiguration(
(tcpServer) -> tcpServer.addressSupplier(this::getListenAddress));
}
if (getSsl() != null && getSsl().isEnabled()) {
SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(),
getHttp2(), getSslStoreProvider());
server = sslServerCustomizer.apply(server);
}
if (getCompression() != null && getCompression().getEnabled()) {
CompressionCustomizer compressionCustomizer = new CompressionCustomizer(
getCompression());
server = compressionCustomizer.apply(server);
}
server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
return applyCustomizers(server);
}
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
return rSocketFactory
.acceptor(socketAcceptor)
.transport((ServerTransport) new WebsocketRouteTransport(
httpServer,
r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
path
));
}
@Test
public void testIssue282() {
DisposableServer server =
HttpServer.create()
.compress(2048)
.port(0)
.handle((req, res) -> res.sendString(Mono.just("testtesttesttesttest")))
.bindNow();
Mono<String> response =
HttpClient.create()
.port(server.port())
.get()
.uri("/")
.responseContent()
.aggregate()
.asString();
StepVerifier.create(response)
.expectNextMatches("testtesttesttesttest"::equals)
.expectComplete()
.verify();
server.disposeNow();
}
@Test
public void testIssue632() throws Exception {
disposableServer =
HttpServer.create()
.port(0)
.wiretap(true)
.handle((req, res) ->
res.header(HttpHeaderNames.CONNECTION,
HttpHeaderValues.UPGRADE + ", " + HttpHeaderValues.CLOSE))
.bindNow();
assertThat(disposableServer).isNotNull();
CountDownLatch latch = new CountDownLatch(1);
createHttpClientForContextWithPort()
.doOnConnected(conn ->
conn.channel()
.closeFuture()
.addListener(future -> latch.countDown()))
.get()
.uri("/")
.responseContent()
.blockLast(Duration.ofSeconds(30));
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
}
@Test
public void test() {
DisposableServer server = HttpServer.create()
.port(0)
.route(httpServerRoutes -> httpServerRoutes.get(
"/",
(httpServerRequest, httpServerResponse) -> {
return httpServerResponse.sendString(
Mono.error(new IllegalArgumentException("test")));
}))
.bindNow(Duration.ofSeconds(30));
HttpClient client = HttpClient.create()
.port(server.port());
StepVerifier.create(client.get()
.uri("/")
.responseContent()
.asString(StandardCharsets.UTF_8)
.collectList())
.expectNextMatches(List::isEmpty)
.verifyComplete();
server.disposeNow();
}
@Test
public void simpleSubprotocolServerNoSubprotocol() {
httpServer = HttpServer.create()
.port(0)
.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("test"))))
.wiretap(true)
.bindNow();
StepVerifier.create(
HttpClient.create()
.port(httpServer.port())
.headers(h -> h.add("Authorization", auth))
.websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
.uri("/test")
.handle((i, o) -> i.receive().asString()))
.verifyErrorMessage("Invalid subprotocol. Actual: null. Expected one of: SUBPROTOCOL,OTHER");
}
@Test
public void testResourceUrlSetInResponse() {
disposableServer =
HttpServer.create()
.port(0)
.handle((req, res) -> res.send())
.wiretap(true)
.bindNow();
final String requestUri = "http://localhost:" + disposableServer.port() + "/foo";
StepVerifier.create(
createHttpClientForContextWithAddress()
.get()
.uri(requestUri)
.responseConnection((res, conn) -> Mono.justOrEmpty(res.resourceUrl())))
.expectNext(requestUri)
.expectComplete()
.verify(Duration.ofSeconds(30));
}
@Test
public void shouldUseInvocationContext() throws Exception {
disposableServer = HttpServer.create().port(0)
// this reads the trace context header, b3, returning it in the response
.handle((in, out) -> out
.sendString(Flux.just(in.requestHeaders().get("b3"))))
.bindNow();
String b3SingleHeaderReadByServer;
try (Scope ws = currentTraceContext.newScope(context)) {
b3SingleHeaderReadByServer = httpClient.port(disposableServer.port()).get()
.uri("/").responseContent().aggregate().asString().block();
}
MutableSpan clientSpan = spanHandler.takeRemoteSpan(CLIENT);
assertThat(b3SingleHeaderReadByServer).isEqualTo(context.traceIdString() + "-"
+ clientSpan.id() + "-1-" + context.spanIdString());
}
@Test
public void noSubprotocolSelected() {
httpServer = HttpServer.create()
.port(0)
.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(
Mono.just("SERVER:" + o.selectedSubprotocol()))))
.wiretap(true)
.bindNow();
List<String> res =
HttpClient.create()
.port(httpServer.port())
.headers(h -> h.add("Authorization", auth))
.websocket()
.uri("/test")
.handle((in, out) -> in.receive()
.asString()
.map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv))
.log()
.collectList()
.block(Duration.ofSeconds(30));
Assert.assertNotNull(res);
Assert.assertThat(res.get(0), is("CLIENT:null-SERVER:null"));
}
@Test
public void simpleSubprotocolServerNotSupported() {
httpServer = HttpServer.create()
.port(0)
.handle((in, out) -> out.sendWebsocket(
(i, o) -> {
return o.sendString(Mono.just("test"));
},
WebsocketServerSpec.builder().protocols("protoA,protoB").build()))
.wiretap(true)
.bindNow();
StepVerifier.create(
HttpClient.create()
.port(httpServer.port())
.headers(h -> h.add("Authorization", auth))
.websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
.uri("/test")
.handle((i, o) -> i.receive().asString()))
//the SERVER returned null which means that it couldn't select a protocol
.verifyErrorMessage("Invalid subprotocol. Actual: null. Expected one of: SUBPROTOCOL,OTHER");
}
private void doTestIssue444(BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> fn) {
httpServer =
HttpServer.create()
.host("localhost")
.port(0)
.handle((req, res) -> res.sendWebsocket(fn))
.wiretap(true)
.bindNow();
StepVerifier.create(
HttpClient.create()
.remoteAddress(httpServer::address)
.wiretap(true)
.websocket()
.uri("/")
.handle((i, o) -> i.receiveFrames()
.then()))
.expectComplete()
.verify(Duration.ofSeconds(30));
}
@Test
public void simpleSubprotocolServerSupported() {
httpServer = HttpServer.create()
.port(0)
.handle((in, out) -> out.sendWebsocket(
(i, o) -> o.sendString(Mono.just("test")),
WebsocketServerSpec.builder().protocols("SUBPROTOCOL").build()))
.wiretap(true)
.bindNow();
List<String> res =
HttpClient.create()
.port(httpServer.port())
.wiretap(true)
.headers(h -> h.add("Authorization", auth))
.websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
.uri("/test")
.handle((i, o) -> i.receive().asString())
.log()
.collectList()
.block(Duration.ofSeconds(30));
Assert.assertNotNull(res);
Assert.assertThat(res.get(0), is("test"));
}
@Before
public void setUp() {
httpServer = customizeServerOptions(
HttpServer.create()
.host("127.0.0.1")
.port(0)
.metrics(true, s -> s)
.route(r -> r.post("/1", (req, res) -> res.header("Connection", "close")
.send(req.receive().retain().delayElements(Duration.ofMillis(10))))
.post("/2", (req, res) -> res.header("Connection", "close")
.send(req.receive().retain().delayElements(Duration.ofMillis(10))))));
provider = ConnectionProvider.create("HttpMetricsHandlerTests", 1);
httpClient =
customizeClientOptions(HttpClient.create(provider)
.remoteAddress(() -> disposableServer.address())
.metrics(true, s -> s));
registry = new SimpleMeterRegistry();
Metrics.addRegistry(registry);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
ApplicationContext context = ((ContextRefreshedEvent) event).getApplicationContext();
if (context != this.context) {
return;
}
if (!ClassUtils.isPresent("org.springframework.http.server.reactive.HttpHandler", null)) {
logger.info("No web server classes found so no server to start");
return;
}
Integer port = Integer.valueOf(context.getEnvironment().resolvePlaceholders("${server.port:${PORT:8080}}"));
String address = context.getEnvironment().resolvePlaceholders("${server.address:0.0.0.0}");
if (port >= 0) {
HttpHandler handler = context.getBean(HttpHandler.class);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host(address).port(port).handle(adapter);
Thread thread = new Thread(
() -> httpServer.bindUntilJavaShutdown(Duration.ofSeconds(60), this::callback),
"server-startup");
thread.setDaemon(false);
thread.start();
}
}
@Test
public void testMaxFramePayloadLengthFailed() {
httpServer = HttpServer.create()
.port(0)
.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("12345678901"))))
.wiretap(true)
.bindNow();
Mono<Void> response = HttpClient.create()
.port(httpServer.port())
.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(10).build())
.handle((in, out) -> in.receive()
.asString()
.map(srv -> srv))
.log()
.then();
StepVerifier.create(response)
.expectError(CorruptedFrameException.class)
.verify(Duration.ofSeconds(30));
}
@Test
public void testIssue614() {
disposableServer =
HttpServer.create()
.port(0)
.route(routes ->
routes.post("/dump", (req, res) -> {
if (req.requestHeaders().contains("Transfer-Encoding")) {
return Mono.error(new Exception("Transfer-Encoding is not expected"));
}
return res.sendString(Mono.just("OK"));
}))
.wiretap(true)
.bindNow();
StepVerifier.create(
createHttpClientForContextWithAddress()
.post()
.uri("/dump")
.sendForm((req, form) -> form.attr("attribute", "value"))
.responseContent()
.aggregate()
.asString())
.expectNext("OK")
.expectComplete()
.verify(Duration.ofSeconds(30));
}
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
return httpServer.handle(adapter);
}
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
return httpServer.handle(adapter);
}
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
return httpServer.handle(adapter);
}
@Test
public void redirectDisabledByDefault() {
DisposableServer server =
HttpServer.create()
.port(0)
.host("localhost")
.wiretap(true)
.route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
.get("/3", (req, res) -> res.status(200)
.sendString(Mono.just("OK"))))
.wiretap(true)
.bindNow();
HttpClientResponse response =
HttpClient.create()
.remoteAddress(server::address)
.wiretap(true)
.get()
.uri("/1")
.response()
.block(Duration.ofSeconds(30));
assertThat(response).isNotNull();
assertThat(response.status()).isEqualTo(HttpResponseStatus.FOUND);
assertThat(response.responseHeaders().get("location")).isEqualTo("/3");
server.disposeNow();
}
@Test
public void testIssue903() throws CertificateException {
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContextBuilder serverCtx = SslContextBuilder.forServer(cert.key(), cert.cert());
DisposableServer server =
HttpServer.create()
.secure(s -> s.sslContext(serverCtx))
.port(0)
.wiretap(true)
.handle((req, resp) -> resp.sendHeaders())
.bindNow();
DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) ConnectionProvider.create("testIssue903", 1);
HttpClient.create(provider)
.port(server.port())
.get()
.uri("/")
.response()
.onErrorResume(e -> Mono.empty())
.block(Duration.ofSeconds(30));
provider.channelPools.forEach((k, v) -> assertThat(v.metrics().acquiredSize()).isEqualTo(0));
provider.disposeLater()
.block(Duration.ofSeconds(30));
server.disposeNow();
}
@Test
public void serverCompressionDisabled() {
HttpServer server = HttpServer.create()
.port(0)
.compress(false);
DisposableServer runningServer =
server.handle((in, out) -> out.sendString(Mono.just("reply")))
.wiretap(true)
.bindNow(Duration.ofSeconds(10));
//don't activate compression on the client options to avoid auto-handling (which removes the header)
HttpClient client = HttpClient.create()
.remoteAddress(runningServer::address)
.wiretap(true);
Tuple2<String, HttpHeaders> resp =
//edit the header manually to attempt to trigger compression on server side
client.headers(h -> h.add("Accept-Encoding", "gzip"))
.get()
.uri("/test")
.response((res, buf) -> buf.asString()
.zipWith(Mono.just(res.responseHeaders())))
.blockFirst();
assertThat(resp).isNotNull();
assertThat(resp.getT2().get("content-encoding")).isNull();
Assert.assertEquals("reply", resp.getT1());
runningServer.dispose();
runningServer.onDispose()
.block();
}
@Test
public void testClientReuseIssue405(){
disposableServer =
HttpServer.create()
.port(0)
.handle((in,out)->out.sendString(Flux.just("hello")))
.wiretap(true)
.bindNow();
ConnectionProvider pool = ConnectionProvider.create("testClientReuseIssue405", 1);
HttpClient httpClient = createHttpClientForContextWithPort(pool);
Mono<String> mono1 =
httpClient.get()
.responseSingle((r, buf) -> buf.asString())
.log("mono1");
Mono<String> mono2 =
httpClient.get()
.responseSingle((r, buf) -> buf.asString())
.log("mono1");
StepVerifier.create(Flux.zip(mono1,mono2))
.expectNext(Tuples.of("hello","hello"))
.expectComplete()
.verify(Duration.ofSeconds(20));
pool.dispose();
}
private void doTestIssue600(boolean withLoop) {
disposableServer =
HttpServer.create()
.port(0)
.handle((req, res) -> res.send(req.receive()
.retain()
.delaySubscription(Duration.ofSeconds(1))))
.wiretap(true)
.bindNow();
ConnectionProvider pool = ConnectionProvider.create("doTestIssue600", 10);
LoopResources loop = LoopResources.create("test", 4, true);
HttpClient client;
if (withLoop) {
client = createHttpClientForContextWithAddress(pool)
.runOn(loop);
}
else {
client = createHttpClientForContextWithAddress(pool);
}
Set<String> threadNames = new ConcurrentSkipListSet<>();
StepVerifier.create(
Flux.range(1,4)
.flatMap(i -> client.request(HttpMethod.GET)
.uri("/")
.send((req, out) -> out.send(Flux.empty()))
.responseContent()
.doFinally(s -> threadNames.add(Thread.currentThread().getName()))))
.expectComplete()
.verify(Duration.ofSeconds(30));
pool.dispose();
loop.dispose();
assertThat(threadNames.size()).isGreaterThan(1);
}
@Test
public void chunkedSendFile() throws URISyntaxException {
Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI());
AtomicReference<String> uploaded = new AtomicReference<>();
disposableServer =
HttpServer.create()
.host("localhost")
.route(r -> r.post("/upload", (req, resp) ->
req.receive()
.aggregate()
.asString(StandardCharsets.UTF_8)
.doOnNext(uploaded::set)
.then(resp.status(201)
.sendString(Mono.just("Received File"))
.then())))
.wiretap(true)
.bindNow();
Tuple2<String, Integer> response =
createHttpClientForContextWithAddress()
.post()
.uri("/upload")
.send((r, out) -> out.sendFile(largeFile))
.responseSingle((res, buf) -> buf.asString()
.zipWith(Mono.just(res.status().code())))
.block(Duration.ofSeconds(30));
assertThat(response).isNotNull();
assertThat(response.getT2()).isEqualTo(201);
assertThat(response.getT1()).isEqualTo("Received File");
assertThat(uploaded.get())
.startsWith("This is an UTF-8 file that is larger than 1024 bytes. " +
"It contains accents like é.")
.contains("1024 mark here -><- 1024 mark here")
.endsWith("End of File");
}
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
return httpServer.handle(adapter);
}
@Override
public HttpServer apply(HttpServer httpServer) {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
return httpServer
.tcpConfiguration(tcpServer -> tcpServer.bootstrap(
serverBootstrap -> serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
));
}
@Bean
public HttpServer httpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
return HttpServer.create()
.host("localhost")
.port(this.port)
.handle(adapter);
}
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
return httpServer.handle(adapter);
}
@DisplayName("constructor throw NullPointer with null routesBuilder")
@Test
void constructorNullRoutesBuilder() {
assertThatNullPointerException()
.isThrownBy(() -> new WebsocketRouteTransport(HttpServer.create(), null, "/test-path"))
.withMessage("routesBuilder must not be null");
}