下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#org.springframework.web.reactive.function.server.RequestPredicates 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
public RouterFunction<ServerResponse> routeCity(InfluxProxyHandler influxProxyHandler) {
return RouterFunctions
.route(RequestPredicates.path("/query")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::query)
.andRoute(RequestPredicates.POST("/write")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::write)
.andRoute(RequestPredicates.GET("/ping")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::ping)
.andRoute(RequestPredicates.path("/debug/{opt}")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::debug)
.andRoute(RequestPredicates.path("/debug")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::debug)
.andRoute(RequestPredicates.path("/refresh/allBackend")
.and(RequestPredicates.accept(MediaType.ALL)),
influxProxyHandler::refreshAllBackend)
;
}
@Bean
public RouterFunction<ServerResponse> route(EmployeeHandler handler) {
return RouterFunctions.route(
GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllEmployees)
.andRoute(
GET("/employee/fn/{fn}/ln/{ln}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getEmployee)
.andRoute(
PUT("/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::createNewEmployee)
.andRoute(
DELETE("/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::deleteEmployee)
.andRoute(
GET("/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllDepartments);
}
@Bean
RouterFunction<ServerResponse> compositeRoutes(){
RouterFunction<ServerResponse> studentResponse =
RouterFunctions.route(RequestPredicates.
GET("/api/f/composite/getStudent/{rollNo}"),
serverRequest -> {
int rollNo = getInt(serverRequest.pathVariable("rollNo"));
return ServerResponse.ok().
body(studentMongoRepository.
findByRollNo(rollNo), Student.class);
})
.and(
RouterFunctions.route(RequestPredicates.
GET("/api/f/composite/getAllStudent"),
serverRequest ->
ServerResponse.ok().
body(studentMongoRepository.findAll(), Student.class))
);
return studentResponse;
}
/**
* This service does not support any of the read operations.
* At this moment we support {@code POST}s for the v1 api encoded in Json or Thrift, or for the v2 api in Json.
*/
@Bean
public RouterFunction<ServerResponse> myRoutes(ZipkinController zipkinController, MetersProvider metersProvider) {
var counterJsonV1 = metersProvider.getSpansCounter("http", "jsonv1");
var counterJsonV2 = metersProvider.getSpansCounter("http", "jsonv2");
var counterThrift = metersProvider.getSpansCounter("http", "thrift");
var counterProtobuf = metersProvider.getSpansCounter("http", "protobuf");
return nest(method(HttpMethod.POST),
nest(contentType(APPLICATION_JSON),
route(path("/api/v1/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V1, counterJsonV1))
.andRoute(path("/api/v2/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V2, counterJsonV2)))
.andRoute(contentType(APPLICATION_THRIFT), request -> zipkinController.addSpans(request, SpanBytesDecoder.THRIFT, counterThrift))
.andRoute(contentType(APPLICATION_PROTOBUF), request -> zipkinController.addSpans(request, SpanBytesDecoder.PROTO3, counterProtobuf)))
.andRoute(RequestPredicates.all(), zipkinController::unmatched);
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(
RequestPredicates.path("/fallback")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), hystrixFallbackHandler)
.andRoute(RequestPredicates.GET("/code")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), imageCodeHandler);
}
@Override
public Mono<Void> handle(ServerWebExchange serverWebExchange, Throwable throwable) {
// 按照异常类型进行处理
HttpStatus httpStatus;
String body;
if (throwable instanceof NotFoundException) {
httpStatus = HttpStatus.NOT_FOUND;
body = "Service Not Found";
} else if (throwable instanceof ResponseStatusException) {
ResponseStatusException responseStatusException = (ResponseStatusException) throwable;
httpStatus = responseStatusException.getStatus();
body = responseStatusException.getMessage();
} else {
httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
body = "Internal Server Error";
}
// 封装响应结果
Map<String, Object> result = new HashMap<>(2, 1);
result.put("httpStatus", httpStatus);
String msg = "{\"code\":" + httpStatus.value() + ",\"message\": \"" + body + "\"}";
result.put("body", msg);
// 错误日志
ServerHttpRequest request = serverWebExchange.getRequest();
log.error("[全局系统异常]\r\n请求路径:{}\r\n异常记录:{}", request.getPath(), throwable.getMessage());
if (serverWebExchange.getResponse().isCommitted()) {
return Mono.error(throwable);
}
exceptionHandlerResult.set(result);
ServerRequest newRequest = ServerRequest.create(serverWebExchange, this.messageReaders);
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse).route(newRequest)
.switchIfEmpty(Mono.error(throwable))
.flatMap((handler) -> handler.handle(newRequest))
.flatMap((response) -> write(serverWebExchange, response));
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(RequestPredicates.GET("/swagger-resources")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerResourceHandler)
.andRoute(RequestPredicates.GET("/swagger-resources/configuration/ui")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerUiHandler)
.andRoute(RequestPredicates.GET("/swagger-resources/configuration/security")
.and(RequestPredicates.accept(MediaType.ALL)), swaggerSecurityHandler);
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
/**
* 按照异常类型进行处理
*/
ResultBody resultBody;
ServerHttpRequest request = exchange.getRequest();
if("/favicon.ico".equals(exchange.getRequest().getURI().getPath())){
return Mono.empty();
}
if (ex instanceof NotFoundException) {
resultBody = ResultBody.failed().code(ErrorCode.SERVICE_UNAVAILABLE.getCode()).msg(ErrorCode.SERVICE_UNAVAILABLE.getMessage()).httpStatus(HttpStatus.SERVICE_UNAVAILABLE.value()).path(request.getURI().getPath());
log.error("==> 错误解析:{}", resultBody);
} else {
resultBody = OpenGlobalExceptionHandler.resolveException((Exception) ex, exchange.getRequest().getURI().getPath());
}
/**
* 参考AbstractErrorWebExceptionHandler
*/
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
exceptionHandlerResult.set(resultBody);
ServerRequest newRequest = ServerRequest.create(exchange, this.messageReaders);
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse).route(newRequest)
.switchIfEmpty(Mono.error(ex))
.flatMap((handler) -> handler.handle(newRequest))
.flatMap((response) -> {
return write(exchange, response,ex);
});
}
/**
* 暴露6200管理端口
*/
@Override
public void run(ApplicationArguments args) throws Exception {
RouterFunction manageRouterFunction = RouterFunctions
.route(RequestPredicates.GET("/ok"), req -> {
return ServerResponse.ok()
.syncBody("online");
})
.andRoute(RequestPredicates.GET("/cmd/{line}"), request -> {
String pathVar = request.pathVariable("line");
try {
if ("down".equals(pathVar)) {
refuseRequestService.setRefuseRequest(true);
} else if ("up".equals(pathVar)) {
refuseRequestService.setRefuseRequest(false);
}
return ServerResponse.ok().body(Mono.just("ok"), String.class);
} catch (Throwable e) {
LOGGER.error("{} request err!", pathVar, e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
});
HttpHandler handler = RouterFunctions.toHttpHandler(manageRouterFunction);
int managePort = serverPort - 1000;
ReactorHttpHandlerAdapter inboundAdapter = new ReactorHttpHandlerAdapter(handler);
// manage port
HttpServer.create().port(managePort).handle(inboundAdapter).bind();
// HttpServer.create(managePort).newHandler(inboundAdapter).block();
LOGGER.info("management services run on port:{}", managePort);
}
@Bean
public RouterFunction<ServerResponse> route(HelloHandler hander) {
/**
* RouterFunctions:对请求路由处理类,即将请求路由到处理器。
* RouterFunctions.route(RequestPredicate, HandlerFunction) 方法,对应的入参是请求参数和处理函数,如果请求匹配,就调用对应的处理器函数。
* RequestPredicates.GET: 是将一个 GET 请求 /webflux/hello 路由到处理器 helloHandler 的 hello 方法上。跟 Spring MVC 模式下的 HandleMapping 的作用类似。
*/
RouterFunction<ServerResponse> routerFunction = RouterFunctions.route(
RequestPredicates.GET("/webflux/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
hander::hello);
return routerFunction;
}
@Bean
RouterFunction<ServerResponse> returnStudentWithCombineFun(){
HandlerFunction<ServerResponse> studentHandler =
serverRequest -> {
int rollNo = getInt(serverRequest.pathVariable("rollNo"));
return ServerResponse.ok().body(studentMongoRepository.findByRollNo(rollNo), Student.class);
};
RouterFunction<ServerResponse> studentResponse =
RouterFunctions.route(RequestPredicates.GET("/api/f/combine/getStudent/{rollNo}"),
studentHandler);
return studentResponse;
}
@Bean
RouterFunction<ServerResponse> returnAllStudentWithCombineFun(){
HandlerFunction<ServerResponse> studentHandler =
serverRequest ->
ServerResponse.ok().body(studentMongoRepository.findAll(), Student.class);
RouterFunction<ServerResponse> studentResponse =
RouterFunctions.route(RequestPredicates.GET("/api/f/combine/getAllStudent"),
studentHandler);
return studentResponse;
}
@Bean
protected RouterFunction<ServerResponse> routingFunction(GraphAlgorithmHandler graphAlgorithmHandler) {
return route(RequestPredicates.POST("/import"), graphAlgorithmHandler::importGraph)
.andRoute(RequestPredicates.POST("/prepare"), graphAlgorithmHandler::prepareGraph)
.andRoute(RequestPredicates.POST("/pregel"), graphAlgorithmHandler::configure)
.andRoute(RequestPredicates.POST("/pregel/{id}"), graphAlgorithmHandler::run)
.andRoute(RequestPredicates.GET("/pregel/{id}"), graphAlgorithmHandler::state)
.andRoute(RequestPredicates.GET("/pregel/{id}/configs"), graphAlgorithmHandler::configs)
.andRoute(RequestPredicates.GET("/pregel/{id}/result"), graphAlgorithmHandler::result)
.andRoute(RequestPredicates.POST("/pregel/{id}/result"), graphAlgorithmHandler::filterResult)
.andRoute(RequestPredicates.DELETE("/pregel/{id}"), graphAlgorithmHandler::delete);
}
@Bean
public RouterFunction<ServerResponse> routeCity(CityHandler cityHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/hello")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
cityHandler::helloCity);
}
@Bean
public RouterFunction<ServerResponse> route(HelloComponent greetingHandler) {
return RouterFunctions.route(
RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
greetingHandler::hello);
}
static RouterFunction<?> doRoute(final ApiHandler apiHandler, final ErrorHandler errorHandler) {
return
nest(path(API_PATH),
nest(accept(APPLICATION_JSON),
route(GET(LOCATION_WITH_ADDRESS_PATH), apiHandler::getLocation)
.andRoute(POST(LOCATION_PATH), apiHandler::postLocation)
).andOther(route(RequestPredicates.all(), errorHandler::notFound))
);
}
@Bean
public RouterFunction<ServerResponse> testFunRouterFunction() {
RouterFunction<ServerResponse> route = RouterFunctions.route(
RequestPredicates.path("/testfun"),
request -> ServerResponse.ok().body(BodyInserters.fromObject("hello")));
return route;
}
@Bean
public RouterFunction<ServerResponse> testFunRouterFunction() {
RouterFunction<ServerResponse> route = RouterFunctions.route(
RequestPredicates.path("/testfun"),
request -> ServerResponse.ok().body(BodyInserters.fromValue("hello")));
return route;
}
@Bean
public RouterFunction<ServerResponse> testWhenMetricPathIsNotMeet() {
RouterFunction<ServerResponse> route = RouterFunctions.route(
RequestPredicates.path("/actuator/metrics/gateway.requests"),
request -> ServerResponse.ok().body(BodyInserters
.fromValue(HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS)));
return route;
}
@Bean
public RouterFunction<ServerResponse> route(TestHandler testHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/route")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
testHandler::route)
.andRoute(RequestPredicates.POST("/route2")
.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
testHandler::route2)
.andRoute(RequestPredicates.HEAD("/route3"), request -> ServerResponse.ok().build());
}
@Bean
public RouterFunction<ServerResponse> route(GreetingHandler greetingHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/hello")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
greetingHandler::hello);
}
@Bean
@Autowired
public RouterFunction<ServerResponse> listUsers(UserRepository userRepository) {
return RouterFunctions.route(RequestPredicates.GET("/users"),
request -> {
Collection<User> users = userRepository.list();
Flux<User> userFlux = Flux.fromIterable(users);
return ServerResponse.ok().body(userFlux, User.class);
});
}
@Bean
public RouterFunction<ServerResponse> validationsRouter(@Autowired CustomRequestEntityValidationHandler dryHandler,
@Autowired FunctionalHandler complexHandler,
@Autowired OtherEntityValidationHandler otherHandler,
@Autowired AnnotatedRequestEntityValidationHandler annotatedEntityHandler) {
return RouterFunctions.route(RequestPredicates.POST("/complex-handler-functional-validation"), complexHandler::handleRequest)
.andRoute(RequestPredicates.POST("/dry-functional-validation"), dryHandler::handleRequest)
.andRoute(RequestPredicates.POST("/other-dry-functional-validation"), otherHandler::handleRequest)
.andRoute(RequestPredicates.POST("/annotated-functional-validation"), annotatedEntityHandler::handleRequest);
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
/**
* 按照异常类型进行处理
*/
HttpStatus httpStatus;
String body;
if (ex instanceof MyException) {
httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
body = JSONObject.toJSONString(ResponseMessage.error(((MyException) ex).getCode(), ex.getMessage()));
} else if (ex instanceof NotFoundException) {
httpStatus = HttpStatus.NOT_FOUND;
body = JSONObject.toJSONString(ResponseMessage.error(404, ex.getMessage()));
} else if (ex instanceof ResponseStatusException) {
ResponseStatusException responseStatusException = (ResponseStatusException) ex;
httpStatus = responseStatusException.getStatus();
body = JSONObject.toJSONString(ResponseMessage.error(500, ex.getMessage()));
} else {
httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
body = JSONObject.toJSONString(ResponseMessage.error(500, ex.getMessage()));
}
/**
* 封装响应体,此body可修改为自己的jsonBody
*/
Map<String, Object> result = new HashMap<>(2, 1);
result.put("httpStatus", httpStatus);
result.put("body", body);
/**
* 错误记录
*/
ServerHttpRequest request = exchange.getRequest();
log.error("[全局异常处理]异常请求路径:{},记录异常信息:{}", request.getPath().pathWithinApplication().value(), ex.getMessage());
ex.printStackTrace();
/**
* 参考AbstractErrorWebExceptionHandler
*/
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
exceptionHandlerResult.set(result);
ServerRequest newRequest = ServerRequest.create(exchange, this.messageReaders);
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse).route(newRequest)
.switchIfEmpty(Mono.error(ex))
.flatMap((handler) -> handler.handle(newRequest))
.flatMap((response) -> write(exchange, response));
}
/**
* 指定响应处理方法为JSON处理的方法
* @param errorAttributes
*/
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
/**
* 处理逻辑
*
* @param exchange exchange
* @param ex ex
* @return Mono
*/
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
String msg = "Internal Server Error";
// 返回给前端用的状态码
int keyCode = ApiMsg.KEY_UNKNOWN;
int msgCode = ApiMsg.ERROR;
if (ex instanceof NotFoundException) {
// 服务不可用
keyCode = ApiMsg.KEY_SERVICE;
msgCode = ApiMsg.UNAVAILABLE;
} else if (ex instanceof ResponseStatusException) {
ResponseStatusException responseStatusException = (ResponseStatusException) ex;
msg = responseStatusException.getMessage();
// 服务响应错误
keyCode = ApiMsg.KEY_SERVICE;
msgCode = ApiMsg.ERROR;
} else if (ex instanceof InvalidValidateCodeException) {
msg = ex.getMessage();
// 验证码错误
keyCode = ApiMsg.KEY_VALIDATE_CODE;
} else if (ex instanceof ValidateCodeExpiredException) {
msg = ex.getMessage();
// 验证码过期
keyCode = ApiMsg.KEY_VALIDATE_CODE;
msgCode = ApiMsg.EXPIRED;
} else if (ex instanceof InvalidAccessTokenException) {
msg = ex.getMessage();
// token非法
keyCode = ApiMsg.KEY_TOKEN;
msgCode = ApiMsg.INVALID;
}
// 封装响应体
ResponseBean<String> responseBean = new ResponseBean<>(msg, keyCode, msgCode);
// 错误记录
ServerHttpRequest request = exchange.getRequest();
log.error("GatewayExceptionHandler: {}, error: {}", request.getPath(), ex.getMessage());
if (exchange.getResponse().isCommitted())
return Mono.error(ex);
exceptionHandlerResult.set(responseBean);
ServerRequest newRequest = ServerRequest.create(exchange, this.messageReaders);
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse).route(newRequest)
.switchIfEmpty(Mono.error(ex))
.flatMap((handler) -> handler.handle(newRequest))
.flatMap((response) -> write(exchange, response));
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(
RequestPredicates.path("/fallback").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), hystrixFallbackHandler);
}
/**
* 指定响应处理方法为JSON处理的方法
* @param errorAttributes
*/
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
/**
* 指定响应处理方法为JSON处理的方法
* @param errorAttributes
*/
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
}
@Bean
public RouterFunction routerFunction() {
return RouterFunctions.route(RequestPredicates.GET("/code")
.and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), imageCodeHandler);
}