下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#org.springframework.web.reactive.function.server.RouterFunctions 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
public RouterFunction<ServerResponse> routeCity(InfluxProxyHandler influxProxyHandler) {
return RouterFunctions
.route(RequestPredicates.path("/query")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::query)
.andRoute(RequestPredicates.POST("/write")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::write)
.andRoute(RequestPredicates.GET("/ping")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::ping)
.andRoute(RequestPredicates.path("/debug/{opt}")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::debug)
.andRoute(RequestPredicates.path("/debug")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::debug)
.andRoute(RequestPredicates.path("/refresh/allBackend")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::refreshAllBackend)
;
}
private RouterFunction<ServerResponse> routingFunction() {
FormHandler formHandler = new FormHandler();
RouterFunction<ServerResponse> restfulRouter = route(GET("/"), serverRequest -> ok().body(Flux.fromIterable(actors), Actor.class)).andRoute(POST("/"), serverRequest -> serverRequest.bodyToMono(Actor.class)
.doOnNext(actors::add)
.then(ok().build()));
return route(GET("/test"), serverRequest -> ok().body(fromObject("helloworld"))).andRoute(POST("/login"), formHandler::handleLogin)
.andRoute(POST("/upload"), formHandler::handleUpload)
.and(RouterFunctions.resources("/files/**", new ClassPathResource("files/")))
.andNest(path("/actor"), restfulRouter)
.filter((request, next) -> {
System.out.println("Before handler invocation: " + request.path());
return next.handle(request);
});
}
@Test
public void exceptionHandler() {
RouterFunction<ServerResponse> routerFunction = RouterFunctions.route()
.GET("/error", request -> Mono.error(new IllegalStateException("boo")))
.build();
new DefaultRouterFunctionSpec(routerFunction)
.handlerStrategies(HandlerStrategies.builder()
.exceptionHandler((exchange, ex) -> {
exchange.getResponse().setStatusCode(HttpStatus.BAD_REQUEST);
return Mono.empty();
})
.build())
.build()
.get()
.uri("/error")
.exchange()
.expectStatus().isBadRequest();
}
@RouterOperations({
@RouterOperation(path = "/hello", operation = @Operation(operationId = "hello", responses = @ApiResponse(responseCode = "200"))),
@RouterOperation(path = "/echo", produces = TEXT_PLAIN_VALUE, operation = @Operation(operationId = "echo", requestBody = @RequestBody(content = @Content(schema = @Schema(type = "string"))),
responses = @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(type = "string"))))),
@RouterOperation(path = "/echo",produces = APPLICATION_JSON_VALUE, operation = @Operation(operationId = "echo", requestBody = @RequestBody(content = @Content(schema = @Schema(type = "string"))),
responses = @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(type = "string"))))),
@RouterOperation(path = "/quotes", produces = APPLICATION_JSON_VALUE, operation = @Operation(operationId = "fetchQuotes", parameters = @Parameter(name = "size", in = ParameterIn.QUERY, schema = @Schema(type = "string")),
responses = @ApiResponse(responseCode = "200", content = @Content(array = @ArraySchema(schema = @Schema(implementation = Quote.class)))))),
@RouterOperation(path = "/quotes", produces = APPLICATION_STREAM_JSON_VALUE, operation = @Operation(operationId = "fetchQuotes",
responses = @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = Quote.class))))) })
@Bean
public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
return RouterFunctions
.route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello)
.andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo)
.andRoute(POST("/echo").and(accept(APPLICATION_JSON).and(contentType(APPLICATION_JSON))), quoteHandler::echo)
.andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes)
.andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
}
private RouterFunction<ServerResponse> routingFunction() {
FormHandler formHandler = new FormHandler();
RouterFunction<ServerResponse> restfulRouter = route(GET("/"),
serverRequest -> ok().body(Flux.fromIterable(actors), Actor.class)).andRoute(POST("/"),
serverRequest -> serverRequest.bodyToMono(Actor.class)
.doOnNext(actors::add)
.then(ok().build()));
return route(GET("/test"), serverRequest -> ok().body(fromObject("helloworld")))
.andRoute(POST("/login"), formHandler::handleLogin)
.andRoute(POST("/upload"), formHandler::handleUpload)
.and(RouterFunctions.resources("/files/**", new ClassPathResource("files/")))
.andNest(path("/actor"), restfulRouter)
.filter((request, next) -> {
System.out.println("Before handler invocation: " + request.path());
return next.handle(request);
});
}
@Bean
@RouterOperations({ @RouterOperation(path = "/getAllPositions", operation = @Operation(description = "Get all positions", operationId = "findAll",tags = "positions",
responses = @ApiResponse(responseCode = "200", content = @Content(array = @ArraySchema(schema = @Schema(implementation = Position.class)))))),
@RouterOperation(path = "/getPosition/{id}", operation = @Operation(description = "Find all", operationId = "findById", tags = "positions",parameters = @Parameter(name = "id", in = ParameterIn.PATH),
responses = @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = Position.class))))),
@RouterOperation(path = "/createPosition", operation = @Operation(description = "Save position", operationId = "save", tags = "positions",requestBody = @RequestBody(content = @Content(schema = @Schema(implementation = Position.class))),
responses = @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = Position.class))))),
@RouterOperation(path = "/deletePosition/{id}", operation = @Operation(description = "Delete By Id", operationId = "deleteBy",tags = "positions", parameters = @Parameter(name = "id", in = ParameterIn.PATH),
responses = @ApiResponse(responseCode = "200", content = @Content)))})
public RouterFunction<ServerResponse> positionRoute(PositionHandler handler) {
return RouterFunctions
.route(GET("/getAllPositions").and(accept(MediaType.APPLICATION_JSON)), handler::findAll)
.andRoute(GET("/getPosition/{id}").and(accept(MediaType.APPLICATION_STREAM_JSON)), handler::findById)
.andRoute(POST("/createPosition").and(accept(MediaType.APPLICATION_JSON)), handler::save)
.andRoute(DELETE("/deletePosition/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::delete);
}
private RouterFunction<ServerResponse> routingFunction() {
FormHandler formHandler = new FormHandler();
RouterFunction<ServerResponse> restfulRouter = route(GET("/"), serverRequest -> ok().body(Flux.fromIterable(actors), Actor.class)).andRoute(POST("/"), serverRequest -> serverRequest.bodyToMono(Actor.class)
.doOnNext(actors::add)
.then(ok().build()));
return route(GET("/test"), serverRequest -> ok().body(fromObject("helloworld"))).andRoute(POST("/login"), formHandler::handleLogin)
.andRoute(POST("/upload"), formHandler::handleUpload)
.and(RouterFunctions.resources("/files/**", new ClassPathResource("files/")))
.andNest(path("/actor"), restfulRouter)
.filter((request, next) -> {
System.out.println("Before handler invocation: " + request.path());
return next.handle(request);
});
}
@SuppressWarnings("unchecked")
private void setAttributes(Map<String, Object> attributes, ServerRequest serverRequest,
HandlerFunction<?> handlerFunction) {
attributes.put(RouterFunctions.REQUEST_ATTRIBUTE, serverRequest);
attributes.put(BEST_MATCHING_HANDLER_ATTRIBUTE, handlerFunction);
PathPattern matchingPattern =
(PathPattern) attributes.get(RouterFunctions.MATCHING_PATTERN_ATTRIBUTE);
if (matchingPattern != null) {
attributes.put(BEST_MATCHING_PATTERN_ATTRIBUTE, matchingPattern);
}
Map<String, String> uriVariables =
(Map<String, String>) attributes
.get(RouterFunctions.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
if (uriVariables != null) {
attributes.put(URI_TEMPLATE_VARIABLES_ATTRIBUTE, uriVariables);
}
}
@Test
public void exceptionHandler() {
RouterFunction<ServerResponse> routerFunction = RouterFunctions.route()
.GET("/error", request -> Mono.error(new IllegalStateException("boo")))
.build();
new DefaultRouterFunctionSpec(routerFunction)
.handlerStrategies(HandlerStrategies.builder()
.exceptionHandler((exchange, ex) -> {
exchange.getResponse().setStatusCode(HttpStatus.BAD_REQUEST);
return Mono.empty();
})
.build())
.build()
.get()
.uri("/error")
.exchange()
.expectStatus().isBadRequest();
}
@Bean
public RouterFunction<ServerResponse> route(EmployeeHandler handler) {
return RouterFunctions.route(
GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllEmployees)
.andRoute(
GET("/employee/fn/{fn}/ln/{ln}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getEmployee)
.andRoute(
PUT("/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::createNewEmployee)
.andRoute(
DELETE("/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::deleteEmployee)
.andRoute(
GET("/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllDepartments);
}
@Bean
public RouterFunction<ServerResponse> routerFunction(
ChatHandler chatHandler
) {
return RouterFunctions
.route(
GET("/"),
serverRequest -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(chatHandler
.messageStream(), Message.class))
.andRoute(
GET("/{user}"),
serverRequest -> {
String user = serverRequest.pathVariable("user");
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(chatHandler
.messageStreamForUser(user), Message.class);
});
}
public static void main(String... args) {
long start = System.currentTimeMillis();
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routes(
new BCryptPasswordEncoder(18)
));
ReactorHttpHandlerAdapter reactorHttpHandler = new ReactorHttpHandlerAdapter(httpHandler);
DisposableServer server = HttpServer.create()
.host("localhost")
.port(8080)
.handle(reactorHttpHandler)
.bindNow();
LOGGER.debug("Started in " + (System.currentTimeMillis() - start) + " ms");
server.onDispose()
.block();
}
@Bean
RouterFunction<ServerResponse> compositeRoutes(){
RouterFunction<ServerResponse> studentResponse =
RouterFunctions.route(RequestPredicates.
GET("/api/f/composite/getStudent/{rollNo}"),
serverRequest -> {
int rollNo = getInt(serverRequest.pathVariable("rollNo"));
return ServerResponse.ok().
body(studentMongoRepository.
findByRollNo(rollNo), Student.class);
})
.and(
RouterFunctions.route(RequestPredicates.
GET("/api/f/composite/getAllStudent"),
serverRequest ->
ServerResponse.ok().
body(studentMongoRepository.findAll(), Student.class))
);
return studentResponse;
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(
RequestPredicates.path("/fallback")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), hystrixFallbackHandler)
.andRoute(RequestPredicates.GET("/code")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), imageCodeHandler);
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
return handlerFunction.handle(request)
.map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE));
}
@Bean
public RouterFunction<?> handler() {
return RouterFunctions.route()
.GET("/sessionClassName", request ->
request.session().flatMap(session -> {
String className = session.getClass().getSimpleName();
return ServerResponse.ok().syncBody(className);
}))
.build();
}
@Before
public void start() throws Exception {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(
route(GET("/test"), request -> ServerResponse.ok().syncBody("It works!")));
this.server = new ReactorHttpServer();
this.server.setHandler(httpHandler);
this.server.afterPropertiesSet();
this.server.start();
this.client = WebTestClient.bindToServer()
.baseUrl("http://localhost:" + this.server.getPort())
.build();
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(RequestPredicates.GET("/swagger-resources")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerResourceHandler)
.andRoute(RequestPredicates.GET("/swagger-resources/configuration/ui")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerUiHandler)
.andRoute(RequestPredicates.GET("/swagger-resources/configuration/security")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerSecurityHandler);
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
/**
* 按照异常类型进行处理
*/
ResultBody resultBody;
ServerHttpRequest request = exchange.getRequest();
if("/favicon.ico".equals(exchange.getRequest().getURI().getPath())){
return Mono.empty();
}
if (ex instanceof NotFoundException) {
resultBody = ResultBody.failed().code(ErrorCode.SERVICE_UNAVAILABLE.getCode()).msg(ErrorCode.SERVICE_UNAVAILABLE.getMessage()).httpStatus(HttpStatus.SERVICE_UNAVAILABLE.value()).path(request.getURI().getPath());
log.error("==> 错误解析:{}", resultBody);
} else {
resultBody = OpenGlobalExceptionHandler.resolveException((Exception) ex, exchange.getRequest().getURI().getPath());
}
/**
* 参考AbstractErrorWebExceptionHandler
*/
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
exceptionHandlerResult.set(resultBody);
ServerRequest newRequest = ServerRequest.create(exchange, this.messageReaders);
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse).route(newRequest)
.switchIfEmpty(Mono.error(ex))
.flatMap((handler) -> handler.handle(newRequest))
.flatMap((response) -> {
return write(exchange, response,ex);
});
}
private static RouterFunction<?> routingFunction() {
return route(GET("/test"), serverRequest -> ok().body(fromObject("helloworld"))).andRoute(POST("/login"), serverRequest -> serverRequest.body(toFormData())
.map(MultiValueMap::toSingleValueMap)
.map(formData -> {
System.out.println("form data: " + formData.toString());
if ("baeldung".equals(formData.get("user")) && "you_know_what_to_do".equals(formData.get("token"))) {
return ok().body(Mono.just("welcome back!"), String.class)
.block();
}
return ServerResponse.badRequest()
.build()
.block();
}))
.andRoute(POST("/upload"), serverRequest -> serverRequest.body(toDataBuffers())
.collectList()
.map(dataBuffers -> {
AtomicLong atomicLong = new AtomicLong(0);
dataBuffers.forEach(d -> atomicLong.addAndGet(d.asByteBuffer()
.array().length));
System.out.println("data length:" + atomicLong.get());
return ok().body(fromObject(atomicLong.toString()))
.block();
}))
.and(RouterFunctions.resources("/files/**", new ClassPathResource("files/")))
.andNest(path("/actor"), route(GET("/"), serverRequest -> ok().body(Flux.fromIterable(actors), Actor.class)).andRoute(POST("/"), serverRequest -> serverRequest.bodyToMono(Actor.class)
.doOnNext(actors::add)
.then(ok().build())))
.filter((request, next) -> {
System.out.println("Before handler invocation: " + request.path());
return next.handle(request);
});
}
@Bean
public RouterFunction<ServerResponse> routerFunction() {
return RouterFunctions.route()
.GET("/exception", request -> {
throw new UnsupportedOperationException("Error");
})
.build();
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
HandlerFunction<?> handlerFunction = (HandlerFunction<?>) handler;
ServerRequest request = exchange.getRequiredAttribute(RouterFunctions.REQUEST_ATTRIBUTE);
return handlerFunction.handle(request)
.map(response -> new HandlerResult(handlerFunction, response, HANDLER_FUNCTION_RETURN_TYPE));
}
@Bean
public RouterFunction<?> handler() {
return RouterFunctions.route()
.GET("/sessionClassName", request ->
request.session().flatMap(session -> {
String className = session.getClass().getSimpleName();
return ServerResponse.ok().syncBody(className);
}))
.build();
}
@Before
public void start() throws Exception {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(
route(GET("/test"), request -> ServerResponse.ok().syncBody("It works!")));
this.server = new ReactorHttpServer();
this.server.setHandler(httpHandler);
this.server.afterPropertiesSet();
this.server.start();
this.client = WebTestClient.bindToServer()
.baseUrl("http://localhost:" + this.server.getPort())
.build();
}
/**
* Configure routes via {@link RouterFunctions.Builder}.
* @see org.springframework.fu.jafu.BeanDefinitionDsl#bean(Class, BeanDefinitionCustomizer...)
*/
public WebFluxServerDsl router(Consumer<RouterFunctions.Builder> routerDsl) {
RouterFunctions.Builder builder = RouterFunctions.route();
context.registerBean(BeanDefinitionReaderUtils.uniqueBeanName(RouterFunction.class.getName(), context), RouterFunction.class, () -> {
routerDsl.accept(builder);
return builder.build();
});
return this;
}
/**
* 暴露6200管理端口
*/
@Override
public void run(ApplicationArguments args) throws Exception {
RouterFunction manageRouterFunction = RouterFunctions
.route(RequestPredicates.GET("/ok"), req -> {
return ServerResponse.ok()
.syncBody("online");
})
.andRoute(RequestPredicates.GET("/cmd/{line}"), request -> {
String pathVar = request.pathVariable("line");
try {
if ("down".equals(pathVar)) {
refuseRequestService.setRefuseRequest(true);
} else if ("up".equals(pathVar)) {
refuseRequestService.setRefuseRequest(false);
}
return ServerResponse.ok().body(Mono.just("ok"), String.class);
} catch (Throwable e) {
LOGGER.error("{} request err!", pathVar, e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
});
HttpHandler handler = RouterFunctions.toHttpHandler(manageRouterFunction);
int managePort = serverPort - 1000;
ReactorHttpHandlerAdapter inboundAdapter = new ReactorHttpHandlerAdapter(handler);
// manage port
HttpServer.create().port(managePort).handle(inboundAdapter).bind();
// HttpServer.create(managePort).newHandler(inboundAdapter).block();
LOGGER.info("management services run on port:{}", managePort);
}
@Bean
public RouterFunction<ServerResponse> route(HelloHandler hander) {
/**
* RouterFunctions:对请求路由处理类,即将请求路由到处理器。
* RouterFunctions.route(RequestPredicate, HandlerFunction) 方法,对应的入参是请求参数和处理函数,如果请求匹配,就调用对应的处理器函数。
* RequestPredicates.GET: 是将一个 GET 请求 /webflux/hello 路由到处理器 helloHandler 的 hello 方法上。跟 Spring MVC 模式下的 HandleMapping 的作用类似。
*/
RouterFunction<ServerResponse> routerFunction = RouterFunctions.route(
RequestPredicates.GET("/webflux/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
hander::hello);
return routerFunction;
}
@Bean
public RouterFunction<ServerResponse> routerFunction(
SensorReadingRepository sensorReadingRepository
) {
return RouterFunctions
.route(
GET("/"),
serverRequest -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(sensorReadingRepository.findBy(), SensorsReadings.class));
}
@Bean
public RouterFunction<ServerResponse> validationsRouter(@Autowired CustomRequestEntityValidationHandler dryHandler,
@Autowired FunctionalHandler complexHandler,
@Autowired OtherEntityValidationHandler otherHandler,
@Autowired AnnotatedRequestEntityValidationHandler annotatedEntityHandler) {
return RouterFunctions.route(RequestPredicates.POST("/complex-handler-functional-validation"), complexHandler::handleRequest)
.andRoute(RequestPredicates.POST("/dry-functional-validation"), dryHandler::handleRequest)
.andRoute(RequestPredicates.POST("/other-dry-functional-validation"), otherHandler::handleRequest)
.andRoute(RequestPredicates.POST("/annotated-functional-validation"), annotatedEntityHandler::handleRequest);
}
@Override
public void initialize(GenericApplicationContext applicationContext) {
var environment = applicationContext.getEnvironment();
if (!environment.acceptsProfiles(Profiles.of("exporter"))) {
return;
}
applicationContext.registerBean(MetricsCollector.class);
applicationContext.registerBean("prometheus", RouterFunction.class, () -> {
var metricsCollector = applicationContext.getBean(MetricsCollector.class);
return RouterFunctions.route()
.GET("/prometheus", __ -> {
return metricsCollector.collect()
.collectList()
.flatMap(metrics -> {
try {
var writer = new StringWriter();
TextFormat.write004(writer, Collections.enumeration(metrics));
return ServerResponse.ok()
.contentType(MediaType.valueOf(TextFormat.CONTENT_TYPE_004))
.bodyValue(writer.toString());
} catch (IOException e) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
});
})
.build();
});
}