io.netty.channel.ChannelId#reactor.netty.http.server.HttpServer源码实例Demo

下面列出了io.netty.channel.ChannelId#reactor.netty.http.server.HttpServer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private HttpServer createHttpServer() {
	HttpServer server = HttpServer.create();
	if (this.resourceFactory != null) {
		LoopResources resources = this.resourceFactory.getLoopResources();
		Assert.notNull(resources,
				"No LoopResources: is ReactorResourceFactory not initialized yet?");
		server = server.tcpConfiguration((tcpServer) -> tcpServer.runOn(resources)
				.addressSupplier(this::getListenAddress));
	}
	else {
		server = server.tcpConfiguration(
				(tcpServer) -> tcpServer.addressSupplier(this::getListenAddress));
	}
	if (getSsl() != null && getSsl().isEnabled()) {
		SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(),
				getHttp2(), getSslStoreProvider());
		server = sslServerCustomizer.apply(server);
	}
	if (getCompression() != null && getCompression().getEnabled()) {
		CompressionCustomizer compressionCustomizer = new CompressionCustomizer(
				getCompression());
		server = compressionCustomizer.apply(server);
	}
	server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
	return applyCustomizers(server);
}
 
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
	RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());


	HttpServer httpServer = createHttpServer();
	ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);

	return rSocketFactory
		.acceptor(socketAcceptor)
		.transport((ServerTransport) new WebsocketRouteTransport(
			httpServer,
			r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
			path
		));
}
 
@Test
public void testIssue282() {
	DisposableServer server =
			HttpServer.create()
			          .compress(2048)
			          .port(0)
			          .handle((req, res) -> res.sendString(Mono.just("testtesttesttesttest")))
			          .bindNow();

	Mono<String> response =
			HttpClient.create()
			          .port(server.port())
			          .get()
			          .uri("/")
			          .responseContent()
			          .aggregate()
			          .asString();

	StepVerifier.create(response)
	            .expectNextMatches("testtesttesttesttest"::equals)
	            .expectComplete()
	            .verify();

	server.disposeNow();
}
 
源代码4 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void testIssue632() throws Exception {
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .wiretap(true)
			          .handle((req, res) ->
			              res.header(HttpHeaderNames.CONNECTION,
			                         HttpHeaderValues.UPGRADE + ", " + HttpHeaderValues.CLOSE))
			          .bindNow();
	assertThat(disposableServer).isNotNull();

	CountDownLatch latch = new CountDownLatch(1);
	createHttpClientForContextWithPort()
	        .doOnConnected(conn ->
	                conn.channel()
	                    .closeFuture()
	                    .addListener(future -> latch.countDown()))
	        .get()
	        .uri("/")
	        .responseContent()
	        .blockLast(Duration.ofSeconds(30));

	assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
}
 
源代码5 项目: reactor-netty   文件: HttpErrorTests.java
@Test
public void test() {
	DisposableServer server = HttpServer.create()
	                              .port(0)
	                              .route(httpServerRoutes -> httpServerRoutes.get(
			                                "/",
			                                (httpServerRequest, httpServerResponse) -> {
				                                return httpServerResponse.sendString(
						                                Mono.error(new IllegalArgumentException("test")));
			                                }))
	                                    .bindNow(Duration.ofSeconds(30));

	HttpClient client = HttpClient.create()
	                              .port(server.port());

	StepVerifier.create(client.get()
	                             .uri("/")
	                             .responseContent()
	                             .asString(StandardCharsets.UTF_8)
	                             .collectList())
	            .expectNextMatches(List::isEmpty)
	            .verifyComplete();

	server.disposeNow();
}
 
源代码6 项目: reactor-netty   文件: WebsocketTest.java
@Test
public void simpleSubprotocolServerNoSubprotocol() {
	httpServer = HttpServer.create()
	                       .port(0)
	                       .handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("test"))))
	                       .wiretap(true)
	                       .bindNow();

	StepVerifier.create(
			HttpClient.create()
			          .port(httpServer.port())
			          .headers(h -> h.add("Authorization", auth))
			          .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
			          .uri("/test")
			          .handle((i, o) -> i.receive().asString()))
	            .verifyErrorMessage("Invalid subprotocol. Actual: null. Expected one of: SUBPROTOCOL,OTHER");
}
 
源代码7 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void testResourceUrlSetInResponse() {
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, res) -> res.send())
			          .wiretap(true)
			          .bindNow();

	final String requestUri = "http://localhost:" + disposableServer.port() + "/foo";
	StepVerifier.create(
	        createHttpClientForContextWithAddress()
	                .get()
	                .uri(requestUri)
	                .responseConnection((res, conn) -> Mono.justOrEmpty(res.resourceUrl())))
	            .expectNext(requestUri)
	            .expectComplete()
	            .verify(Duration.ofSeconds(30));
}
 
@Test
public void shouldUseInvocationContext() throws Exception {
	disposableServer = HttpServer.create().port(0)
			// this reads the trace context header, b3, returning it in the response
			.handle((in, out) -> out
					.sendString(Flux.just(in.requestHeaders().get("b3"))))
			.bindNow();

	String b3SingleHeaderReadByServer;
	try (Scope ws = currentTraceContext.newScope(context)) {
		b3SingleHeaderReadByServer = httpClient.port(disposableServer.port()).get()
				.uri("/").responseContent().aggregate().asString().block();
	}

	MutableSpan clientSpan = spanHandler.takeRemoteSpan(CLIENT);

	assertThat(b3SingleHeaderReadByServer).isEqualTo(context.traceIdString() + "-"
			+ clientSpan.id() + "-1-" + context.spanIdString());
}
 
源代码9 项目: reactor-netty   文件: WebsocketTest.java
@Test
public void noSubprotocolSelected() {
	httpServer = HttpServer.create()
	                       .port(0)
	                       .handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(
	                               Mono.just("SERVER:" + o.selectedSubprotocol()))))
	                       .wiretap(true)
	                       .bindNow();

	List<String> res =
			HttpClient.create()
			          .port(httpServer.port())
			          .headers(h -> h.add("Authorization", auth))
			          .websocket()
			          .uri("/test")
			          .handle((in, out) -> in.receive()
			                                 .asString()
			                                 .map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv))
			          .log()
			          .collectList()
			          .block(Duration.ofSeconds(30));

	Assert.assertNotNull(res);
	Assert.assertThat(res.get(0), is("CLIENT:null-SERVER:null"));
}
 
源代码10 项目: reactor-netty   文件: WebsocketTest.java
@Test
public void simpleSubprotocolServerNotSupported() {
	httpServer = HttpServer.create()
	                       .port(0)
	                       .handle((in, out) -> out.sendWebsocket(
	                               (i, o) -> {
	                                   return o.sendString(Mono.just("test"));
	                               },
	                               WebsocketServerSpec.builder().protocols("protoA,protoB").build()))
	                       .wiretap(true)
	                       .bindNow();

	StepVerifier.create(
			HttpClient.create()
			          .port(httpServer.port())
			          .headers(h -> h.add("Authorization", auth))
			          .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
			          .uri("/test")
			          .handle((i, o) -> i.receive().asString()))
	            //the SERVER returned null which means that it couldn't select a protocol
	            .verifyErrorMessage("Invalid subprotocol. Actual: null. Expected one of: SUBPROTOCOL,OTHER");
}
 
源代码11 项目: reactor-netty   文件: WebsocketTest.java
private void doTestIssue444(BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> fn) {
	httpServer =
			HttpServer.create()
			          .host("localhost")
			          .port(0)
			          .handle((req, res) -> res.sendWebsocket(fn))
			          .wiretap(true)
			          .bindNow();

	StepVerifier.create(
			HttpClient.create()
			          .remoteAddress(httpServer::address)
			          .wiretap(true)
			          .websocket()
			          .uri("/")
			          .handle((i, o) -> i.receiveFrames()
			                             .then()))
			    .expectComplete()
			    .verify(Duration.ofSeconds(30));
}
 
源代码12 项目: reactor-netty   文件: WebsocketTest.java
@Test
public void simpleSubprotocolServerSupported() {
	httpServer = HttpServer.create()
	                       .port(0)
	                       .handle((in, out) -> out.sendWebsocket(
	                               (i, o) -> o.sendString(Mono.just("test")),
	                               WebsocketServerSpec.builder().protocols("SUBPROTOCOL").build()))
	                       .wiretap(true)
	                       .bindNow();

	List<String> res =
			HttpClient.create()
			          .port(httpServer.port())
			          .wiretap(true)
			          .headers(h -> h.add("Authorization", auth))
			          .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build())
			          .uri("/test")
			          .handle((i, o) -> i.receive().asString())
			          .log()
			          .collectList()
			          .block(Duration.ofSeconds(30));

	Assert.assertNotNull(res);
	Assert.assertThat(res.get(0), is("test"));
}
 
源代码13 项目: reactor-netty   文件: HttpMetricsHandlerTests.java
@Before
public void setUp() {
	httpServer = customizeServerOptions(
			HttpServer.create()
			          .host("127.0.0.1")
			          .port(0)
			          .metrics(true, s -> s)
			          .route(r -> r.post("/1", (req, res) -> res.header("Connection", "close")
			                                                    .send(req.receive().retain().delayElements(Duration.ofMillis(10))))
			                       .post("/2", (req, res) -> res.header("Connection", "close")
			                                                    .send(req.receive().retain().delayElements(Duration.ofMillis(10))))));

	provider = ConnectionProvider.create("HttpMetricsHandlerTests", 1);
	httpClient =
			customizeClientOptions(HttpClient.create(provider)
			                                 .remoteAddress(() -> disposableServer.address())
			                                 .metrics(true, s -> s));

	registry = new SimpleMeterRegistry();
	Metrics.addRegistry(registry);
}
 
@Override
public void onApplicationEvent(ApplicationEvent event) {
	ApplicationContext context = ((ContextRefreshedEvent) event).getApplicationContext();
	if (context != this.context) {
		return;
	}
	if (!ClassUtils.isPresent("org.springframework.http.server.reactive.HttpHandler", null)) {
		logger.info("No web server classes found so no server to start");
		return;
	}
	Integer port = Integer.valueOf(context.getEnvironment().resolvePlaceholders("${server.port:${PORT:8080}}"));
	String address = context.getEnvironment().resolvePlaceholders("${server.address:0.0.0.0}");
	if (port >= 0) {
		HttpHandler handler = context.getBean(HttpHandler.class);
		ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
		HttpServer httpServer = HttpServer.create().host(address).port(port).handle(adapter);
		Thread thread = new Thread(
				() -> httpServer.bindUntilJavaShutdown(Duration.ofSeconds(60), this::callback),
				"server-startup");
		thread.setDaemon(false);
		thread.start();
	}
}
 
源代码15 项目: reactor-netty   文件: WebsocketTest.java
@Test
public void testMaxFramePayloadLengthFailed() {
	httpServer = HttpServer.create()
	                       .port(0)
	                       .handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("12345678901"))))
	                       .wiretap(true)
	                       .bindNow();

	Mono<Void> response = HttpClient.create()
	                                .port(httpServer.port())
	                                .websocket(WebsocketClientSpec.builder().maxFramePayloadLength(10).build())
	                                .handle((in, out) -> in.receive()
	                                                       .asString()
	                                                       .map(srv -> srv))
	                                .log()
	                                .then();

	StepVerifier.create(response)
	            .expectError(CorruptedFrameException.class)
	            .verify(Duration.ofSeconds(30));
}
 
源代码16 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void testIssue614() {
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .route(routes ->
			              routes.post("/dump", (req, res) -> {
			                  if (req.requestHeaders().contains("Transfer-Encoding")) {
			                      return Mono.error(new Exception("Transfer-Encoding is not expected"));
			                  }
			                  return res.sendString(Mono.just("OK"));
			              }))
			          .wiretap(true)
			          .bindNow();

	StepVerifier.create(
			createHttpClientForContextWithAddress()
			        .post()
			        .uri("/dump")
			        .sendForm((req, form) -> form.attr("attribute", "value"))
			        .responseContent()
			        .aggregate()
			        .asString())
			    .expectNext("OK")
			    .expectComplete()
			    .verify(Duration.ofSeconds(30));
}
 
源代码17 项目: spring-reactive-sample   文件: Application.java
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
    return httpServer.handle(adapter);
}
 
源代码18 项目: spring-reactive-sample   文件: Application.java
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
    return httpServer.handle(adapter);
}
 
源代码19 项目: spring-reactive-sample   文件: Application.java
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
    return httpServer.handle(adapter);
}
 
源代码20 项目: reactor-netty   文件: HttpRedirectTest.java
@Test
public void redirectDisabledByDefault() {
	DisposableServer server =
			HttpServer.create()
			          .port(0)
			          .host("localhost")
			          .wiretap(true)
			          .route(r -> r.get("/1", (req, res) -> res.sendRedirect("/3"))
			                       .get("/3", (req, res) -> res.status(200)
			                                                   .sendString(Mono.just("OK"))))
			          .wiretap(true)
			          .bindNow();

	HttpClientResponse response =
			HttpClient.create()
			          .remoteAddress(server::address)
			          .wiretap(true)
			          .get()
			          .uri("/1")
			          .response()
			          .block(Duration.ofSeconds(30));

	assertThat(response).isNotNull();
	assertThat(response.status()).isEqualTo(HttpResponseStatus.FOUND);
	assertThat(response.responseHeaders().get("location")).isEqualTo("/3");

	server.disposeNow();
}
 
@Test
public void testIssue903() throws CertificateException {
	SelfSignedCertificate cert = new SelfSignedCertificate();
	SslContextBuilder serverCtx = SslContextBuilder.forServer(cert.key(), cert.cert());
	DisposableServer server =
			HttpServer.create()
			          .secure(s -> s.sslContext(serverCtx))
			          .port(0)
			          .wiretap(true)
			          .handle((req, resp) -> resp.sendHeaders())
			          .bindNow();

	DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) ConnectionProvider.create("testIssue903", 1);
	HttpClient.create(provider)
	          .port(server.port())
	          .get()
	          .uri("/")
	          .response()
	          .onErrorResume(e -> Mono.empty())
	          .block(Duration.ofSeconds(30));

	provider.channelPools.forEach((k, v) -> assertThat(v.metrics().acquiredSize()).isEqualTo(0));

	provider.disposeLater()
	        .block(Duration.ofSeconds(30));
	server.disposeNow();
}
 
@Test
public void serverCompressionDisabled() {
	HttpServer server = HttpServer.create()
	                              .port(0)
	                              .compress(false);

	DisposableServer runningServer =
			server.handle((in, out) -> out.sendString(Mono.just("reply")))
			      .wiretap(true)
			      .bindNow(Duration.ofSeconds(10));

	//don't activate compression on the client options to avoid auto-handling (which removes the header)
	HttpClient client = HttpClient.create()
			                      .remoteAddress(runningServer::address)
			                      .wiretap(true);
	Tuple2<String, HttpHeaders> resp =
			//edit the header manually to attempt to trigger compression on server side
			client.headers(h -> h.add("Accept-Encoding", "gzip"))
			      .get()
			      .uri("/test")
			      .response((res, buf) -> buf.asString()
			                                 .zipWith(Mono.just(res.responseHeaders())))
			      .blockFirst();

	assertThat(resp).isNotNull();
	assertThat(resp.getT2().get("content-encoding")).isNull();

	Assert.assertEquals("reply", resp.getT1());

	runningServer.dispose();
	runningServer.onDispose()
	            .block();
}
 
源代码23 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void testClientReuseIssue405(){
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .handle((in,out)->out.sendString(Flux.just("hello")))
			          .wiretap(true)
			          .bindNow();

	ConnectionProvider pool = ConnectionProvider.create("testClientReuseIssue405", 1);
	HttpClient httpClient = createHttpClientForContextWithPort(pool);

	Mono<String> mono1 =
			httpClient.get()
			          .responseSingle((r, buf) -> buf.asString())
			          .log("mono1");

	Mono<String> mono2 =
			httpClient.get()
			          .responseSingle((r, buf) -> buf.asString())
			          .log("mono1");

	StepVerifier.create(Flux.zip(mono1,mono2))
	            .expectNext(Tuples.of("hello","hello"))
	            .expectComplete()
	            .verify(Duration.ofSeconds(20));

	pool.dispose();
}
 
源代码24 项目: reactor-netty   文件: HttpClientTest.java
private void doTestIssue600(boolean withLoop) {
	disposableServer =
			HttpServer.create()
			          .port(0)
			          .handle((req, res) -> res.send(req.receive()
			                                            .retain()
			                                            .delaySubscription(Duration.ofSeconds(1))))
			          .wiretap(true)
			          .bindNow();

	ConnectionProvider pool = ConnectionProvider.create("doTestIssue600", 10);
	LoopResources loop = LoopResources.create("test", 4, true);
	HttpClient client;
	if (withLoop) {
		client = createHttpClientForContextWithAddress(pool)
		            .runOn(loop);
	}
	else {
		client = createHttpClientForContextWithAddress(pool);
	}

	Set<String> threadNames = new ConcurrentSkipListSet<>();
	StepVerifier.create(
			Flux.range(1,4)
			    .flatMap(i -> client.request(HttpMethod.GET)
			                        .uri("/")
			                        .send((req, out) -> out.send(Flux.empty()))
			                        .responseContent()
			                        .doFinally(s -> threadNames.add(Thread.currentThread().getName()))))
		            .expectComplete()
	            .verify(Duration.ofSeconds(30));

	pool.dispose();
	loop.dispose();

	assertThat(threadNames.size()).isGreaterThan(1);
}
 
源代码25 项目: reactor-netty   文件: HttpClientTest.java
@Test
public void chunkedSendFile() throws URISyntaxException {
	Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI());
	AtomicReference<String> uploaded = new AtomicReference<>();

	disposableServer =
			HttpServer.create()
			          .host("localhost")
			          .route(r -> r.post("/upload", (req, resp) ->
			                  req.receive()
			                    .aggregate()
			                    .asString(StandardCharsets.UTF_8)
			                    .doOnNext(uploaded::set)
			                    .then(resp.status(201)
			                              .sendString(Mono.just("Received File"))
			                              .then())))
			          .wiretap(true)
			          .bindNow();

	Tuple2<String, Integer> response =
			createHttpClientForContextWithAddress()
			        .post()
			        .uri("/upload")
			        .send((r, out) -> out.sendFile(largeFile))
			        .responseSingle((res, buf) -> buf.asString()
			                                         .zipWith(Mono.just(res.status().code())))
			        .block(Duration.ofSeconds(30));

	assertThat(response).isNotNull();
	assertThat(response.getT2()).isEqualTo(201);
	assertThat(response.getT1()).isEqualTo("Received File");

	assertThat(uploaded.get())
	                   .startsWith("This is an UTF-8 file that is larger than 1024 bytes. " +
	                           "It contains accents like é.")
	                   .contains("1024 mark here -><- 1024 mark here")
	                   .endsWith("End of File");
}
 
源代码26 项目: spring-reactive-sample   文件: Application.java
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
    return httpServer.handle(adapter);
}
 
源代码27 项目: tutorials   文件: CustomNettyWebServerFactory.java
@Override
public HttpServer apply(HttpServer httpServer) {
    EventLoopGroup parentGroup = new NioEventLoopGroup();
    EventLoopGroup childGroup = new NioEventLoopGroup();
    return httpServer
            .tcpConfiguration(tcpServer -> tcpServer.bootstrap(
                    serverBootstrap -> serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
            ));
}
 
源代码28 项目: spring-reactive-sample   文件: Application.java
@Bean
public HttpServer httpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    return  HttpServer.create()
            .host("localhost")
            .port(this.port)
            .handle(adapter);
}
 
源代码29 项目: spring-reactive-sample   文件: Application.java
@Profile("default")
@Bean
public HttpServer nettyHttpServer(ApplicationContext context) {
    HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
    HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
    return httpServer.handle(adapter);
}
 
源代码30 项目: rsocket-java   文件: WebsocketRouteTransportTest.java
@DisplayName("constructor throw NullPointer with null routesBuilder")
@Test
void constructorNullRoutesBuilder() {
  assertThatNullPointerException()
      .isThrownBy(() -> new WebsocketRouteTransport(HttpServer.create(), null, "/test-path"))
      .withMessage("routesBuilder must not be null");
}