下面列出了org.springframework.http.server.reactive.HttpHandler#org.springframework.web.reactive.function.server.ServerResponse 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void write_works_as_expected() {
// given
ServerResponse responseMock = mock(ServerResponse.class);
HttpHeaders responseHeadersMock = mock(HttpHeaders.class);
MediaType expectedContentTypeMock = mock(MediaType.class);
doReturn(responseHeadersMock).when(responseMock).headers();
doReturn(expectedContentTypeMock).when(responseHeadersMock).getContentType();
// when
handlerSpy.write(serverWebExchangeMock, responseMock);
// then
verify(serverHttpResponseHeadersMock).setContentType(expectedContentTypeMock);
ArgumentCaptor<ResponseContext> responseContextArgumentCaptor = ArgumentCaptor.forClass(ResponseContext.class);
verify(responseMock).writeTo(eq(serverWebExchangeMock), responseContextArgumentCaptor.capture());
ResponseContext responseContext = responseContextArgumentCaptor.getValue();
assertThat(responseContext.messageWriters()).isEqualTo(messageWriters);
assertThat(responseContext.viewResolvers()).isEqualTo(viewResolvers);
}
@Override
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
//生成验证码
String text = producer.createText();
BufferedImage image = producer.createImage(text);
//保存验证码信息
String randomStr = serverRequest.queryParam("randomStr").get();
redisTemplate.opsForValue().set(DEFAULT_CODE_KEY + randomStr, text, 120, TimeUnit.SECONDS);
// 转换流信息写出
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
try {
ImageIO.write(image, "jpeg", os);
} catch (IOException e) {
log.error("ImageIO write err", e);
return Mono.error(e);
}
return ServerResponse
.status(HttpStatus.OK)
.contentType(MediaType.IMAGE_JPEG)
.body(BodyInserters.fromResource(new ByteArrayResource(os.toByteArray())));
}
public Mono<ServerResponse> update(ServerRequest req) {
return Mono
.zip(
(data) -> {
Post p = (Post) data[0];
Post p2 = (Post) data[1];
p.setTitle(p2.getTitle());
p.setContent(p2.getContent());
return p;
},
this.posts.findById(req.pathVariable("id")),
req.bodyToMono(Post.class)
)
.cast(Post.class)
.flatMap(post -> this.posts.save(post))
.flatMap(post -> ServerResponse.noContent().build());
}
@Override
protected Mono<ServerResponse> processBody(OtherEntity validBody, ServerRequest originalRequest) {
String responseBody = String.format("Other object with item %s and quantity %s!", validBody.getItem(), validBody.getQuantity());
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(responseBody), String.class);
}
@Bean
RouterFunction<ServerResponse> routes(WebClient client) {
return route(GET("/reservations/names"), r -> {
Publisher<String> map = client
.get()
.uri("http://reservation-service/reservations")
.retrieve()
.bodyToFlux(Reservation.class)
.map(Reservation::getEmail);
Publisher<String> fallback =
HystrixCommands
.from(map)
.fallback(Mono.just("EEK!"))
.commandName("fallback")
.eager()
.build();
return ServerResponse.ok().body(fallback, String.class);
});
}
@Bean
protected RouterFunction<ServerResponse> routingFunction(GraphAlgorithmHandler graphAlgorithmHandler) {
return route(RequestPredicates.POST("/import"), graphAlgorithmHandler::importGraph)
.andRoute(RequestPredicates.POST("/prepare"), graphAlgorithmHandler::prepareGraph)
.andRoute(RequestPredicates.POST("/pregel"), graphAlgorithmHandler::configure)
.andRoute(RequestPredicates.POST("/pregel/{id}"), graphAlgorithmHandler::run)
.andRoute(RequestPredicates.GET("/pregel/{id}"), graphAlgorithmHandler::state)
.andRoute(RequestPredicates.GET("/pregel/{id}/configs"), graphAlgorithmHandler::configs)
.andRoute(RequestPredicates.GET("/pregel/{id}/result"), graphAlgorithmHandler::result)
.andRoute(RequestPredicates.POST("/pregel/{id}/result"), graphAlgorithmHandler::filterResult)
.andRoute(RequestPredicates.DELETE("/pregel/{id}"), graphAlgorithmHandler::delete);
}
@Bean
public RouterFunction<ServerResponse> employeeServiceBox() {
return route(GET("/listFluxEmps"), dataHandler::empList)
.andRoute(GET("/selectEmpById/{id}"), dataHandler::chooseEmpById)
.andRoute(POST("/selectFluxEmps"), dataHandler::chooseFluxEmps)
.andRoute(POST("/saveEmp"), dataHandler::saveEmployeeMono)
.andRoute(GET("/avgAgeEmps"), dataHandler::averageAge)
.andRoute(GET("/totalAgeEmps"), dataHandler::totalAge)
.andRoute(GET("/countEmps"), dataHandler::countEmps)
.andRoute(GET("/countPerDept/{deptid}"), dataHandler::countEmpsPerDept)
.andRoute(GET("/selectEmpValidAge/{age}"), dataHandler::chooseFluxEmpsValidAge);
}
public Mono<ServerResponse> create(ServerRequest request) {
return request
.bodyToMono(Order.class)
.flatMap(orderRepository::save)
.flatMap(o ->
ServerResponse.created(URI.create("/orders/" + o.getId()))
.build()
);
}
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable ex) {
if (acceptsHtml(exchange)) {
return htmlErrorResponse(ex);
}
// JSON result by default.
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(fromObject(buildErrorResult(ex)));
}
@Override
protected Mono<ServerResponse> doHandle(ServerRequest request, RawMetric rm) {
Map<String, String> extraTags = rm.getExtraTags();
MetricImporterUtils.resolveExtraTagsFromRequestHeaders(request, extraTags);
try {
MetricImporterUtils.validExtraTags(extraTags);
} catch (IllegalArgumentException e) {
return error(e.getMessage());
}
rm.setSourceType(SourceType.OPENTSDB);
rm.setPushMode(true);
super.fire(rm);
return success();
}
public Mono<ServerResponse> countEmpsPerDept(ServerRequest req) {
Function<Employee, Integer> ages = (emp) -> emp.getAge();
Function<Employee, Mono<Integer>> flatMapAge = (emp) -> Mono.just(emp).map(ages);
Mono<Integer> count = Flux.fromIterable(employeeServiceImpl.findAllEmps()).filter((emp) -> emp.getDeptid().equals(Integer.parseInt(req.pathVariable("deptid"))))
.flatMap(flatMapAge)
.reduce((total, increment) -> total + increment);
return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(count, Integer.class)
.switchIfEmpty(ServerResponse.notFound().build());
}
@Bean
public RouterFunction<ServerResponse> subnetHandlerRouting(SubnetWebHandlers subnetWebHandlers) {
return
route(GET("/project/{projectId}/subnets/{subnetId}"), subnetWebHandlers::getSubnet)
.andRoute(POST("/project/{projectId}/subnets").and(accept(APPLICATION_JSON)).and(contentType(APPLICATION_JSON)), subnetWebHandlers::createSubnet)
.andRoute(PUT("/project/{projectId}/subnets/{subnetId}").and(accept(APPLICATION_JSON)).and(contentType(APPLICATION_JSON)), subnetWebHandlers::updateSubnet)
.andRoute(DELETE("/project/{projectId}/subnets/{subnetId}"), subnetWebHandlers::deleteSubnet)
.andRoute(GET("/project/{projectId}/subnets"), subnetWebHandlers::getSubnets);
// .addRoute(POST("/project/{projectId}/subnets"), subnetWebHandlers::bulkCreateSubnet);
}
@Override
protected Mono<ServerResponse> onValidationErrors(Errors errors, CustomRequestEntity invalidBody, final ServerRequest request) {
return ServerResponse.badRequest()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(String.format("Custom message showing the errors: %s", errors.getAllErrors()
.toString())), String.class);
}
/**
* PUT a User
*/
public Mono<ServerResponse> putUser(ServerRequest request) {
// parse id from path-variable
long customerId = Long.valueOf(request.pathVariable("id"));
// get customer data from request object
Mono<User> customer = request.bodyToMono(User.class);
// get customer from repository
Mono<User> responseMono = customerRepository.putUser(customerId, customer);
// build response
return responseMono
.flatMap(cust -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(cust)));
}
/**
* monoRouterFunction.
* @param bookHandler a {@link BookHandler} object.
* @return a {@link org.springframework.web.reactive.function.server.RouterFunction}
* object.
*/
@Bean
public RouterFunction<ServerResponse> monoRouterFunction(BookHandler bookHandler) {
return route(GET("/api/book").and(accept(MediaType.APPLICATION_JSON)), request -> bookHandler.getAll())
.andRoute(GET("/api/book/{id}").and(accept(MediaType.APPLICATION_JSON)), bookHandler::getBook)
.andRoute(POST("/api/book/post").and(accept(MediaType.APPLICATION_JSON)), bookHandler::postBook)
.andRoute(PUT("/api/book/put/{id}").and(accept(MediaType.APPLICATION_JSON)), bookHandler::putBook)
.andRoute(DELETE("/api/book/delete/{id}").and(accept(MediaType.APPLICATION_JSON)),
bookHandler::deleteBook);
}
@Bean
public RouterFunction<ServerResponse> departmentServiceBox(){
return route(GET("/listFluxDepts"), dataHandler::deptList)
.andRoute(GET("/selectDeptById/{id}"), dataHandler::chooseDeptById)
.andRoute(POST("/selectFluxDepts"), dataHandler::chooseFluxDepts)
.andRoute(POST("/saveFluxDept"), dataHandler::saveDepartmentMono)
.andRoute(GET("/countFluxDepts"), dataHandler::countDepts);
}
public Mono<ServerResponse> countLogins(ServerRequest req) {
Mono<Long> count = Flux.fromIterable(logindetailsServiceImpl.findAllLogindetails())
.count();
TotalUsers countEmp = new TotalUsers();
countEmp.setCount(count.block());
Mono<TotalUsers> monoCntLogins = Mono.justOrEmpty(countEmp);
return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(monoCntLogins, TotalUsers.class)
.switchIfEmpty(ServerResponse.notFound().build());
}
/**
* Handle the given request.
*
* @param request the request to handler
* @return the response
*/
@Override
public Mono<ServerResponse> handle(ServerRequest request) {
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(UiConfigurationBuilder.builder().build()));
}
Mono<ServerResponse> handleLogin(ServerRequest request) {
return request.body(toFormData())
.map(MultiValueMap::toSingleValueMap)
.filter(formData -> "baeldung".equals(formData.get("user")))
.filter(formData -> "you_know_what_to_do".equals(formData.get("token")))
.flatMap(formData -> ok().body(Mono.just("welcome back!"), String.class))
.switchIfEmpty(ServerResponse.badRequest()
.build());
}
public Mono<ServerResponse> getSubnet(ServerRequest request) {
UUID projectId = UUID.fromString(request.pathVariable("projectId"));
UUID subnetId = UUID.fromString(request.pathVariable("subnetId"));
Mono<SubnetWebJson> subnetInfo = serviceProxy.findSubnetById(projectId, subnetId);
return subnetInfo.flatMap(od -> ServerResponse.ok()
.contentType(APPLICATION_JSON)
.body(fromObject(od)))
.onErrorResume(SubnetNotFoundException.class, e -> ServerResponse.notFound().build());
}
/**
* 参考AbstractErrorWebExceptionHandler
*
* @param exchange
* @param response
* @return
*/
private Mono<? extends Void> write(ServerWebExchange exchange,
ServerResponse response,Throwable ex) {
exchange.getResponse().getHeaders()
.setContentType(response.headers().getContentType());
// 保存日志
accessLogService.sendLog(exchange, (Exception) ex);
return response.writeTo(exchange, new ResponseContext());
}
@Bean
RouterFunction<ServerResponse> githubRoutes(final GithubProperties props,
final WebClient githubWebClient) {
return
route(GET("/github/props/manual"),
request -> ok().body(Mono.just(
singletonMap("github",
singletonMap("token", props.getToken()))),
Map.class))
.andRoute(GET("/github/props/**"),
request -> ok().body(Mono.just(props), GithubProperties.class))
.andRoute(GET("/github/search/users/{username}"), // ?page=1&size=2
request -> ok().body(githubWebClient.get()
.uri(
"/search/users?q={q}&page={page}&per_page={per_page}",
HashMap.of(
"q", request.pathVariable("username"),
"page", request.queryParam("page").orElse("0"),
"per_page", request.queryParam("size").orElse("3")
).toJavaMap()
)
.exchange()
.subscribeOn(Schedulers.elastic())
.flatMapMany(response -> response.bodyToFlux(Map.class)),
Map.class))
.andRoute(GET("/**"),
request -> ok().body(Mono.just(singletonMap("result", "TODO")),
Map.class))
;
}
private RouterFunction<ServerResponse> routingFunction() {
return route(GET("/p?ths"), serverRequest -> ok().body(fromObject("/p?ths"))).andRoute(GET("/test/{*id}"), serverRequest -> ok().body(fromObject(serverRequest.pathVariable("id"))))
.andRoute(GET("/*card"), serverRequest -> ok().body(fromObject("/*card path was accessed")))
.andRoute(GET("/{var1}_{var2}"), serverRequest -> ok().body(fromObject(serverRequest.pathVariable("var1") + " , " + serverRequest.pathVariable("var2"))))
.andRoute(GET("/{baeldung:[a-z]+}"), serverRequest -> ok().body(fromObject("/{baeldung:[a-z]+} was accessed and baeldung=" + serverRequest.pathVariable("baeldung"))))
.and(RouterFunctions.resources("/files/{*filepaths}", new ClassPathResource("files/")));
}
public final Mono<ServerResponse> handleRequest(final ServerRequest request) {
return request.bodyToMono(this.validationClass)
.flatMap(body -> {
Errors errors = new BeanPropertyBindingResult(body, this.validationClass.getName());
this.validator.validate(body, errors);
if (errors == null || errors.getAllErrors()
.isEmpty()) {
return processBody(body, request);
} else {
return onValidationErrors(errors, body, request);
}
});
}
public Mono<ServerResponse> countDepts(ServerRequest req) {
Mono<Long> count = Flux.fromIterable(departmentServiceImpl.findAllDepts())
.count();
CountDept countDept = new CountDept();
countDept.setCount(count.block());
Mono<CountDept> monoCntDept = Mono.justOrEmpty(countDept);
return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(monoCntDept, CountDept.class)
.switchIfEmpty(ServerResponse.notFound().build());
}
private static ApplicationContext applicationContext(WebFilter... webFilters) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
for(WebFilter filter : webFilters) {
context.registerBean(WebFilter.class, () -> filter);
}
context.registerBean("webHandler", DispatcherHandler.class, () -> new DispatcherHandler());
context.registerBean(HandlerMapping.class, () -> RouterFunctions.toHandlerMapping(request -> Mono.just(r -> ServerResponse.ok().build())));
context.registerBean(HandlerAdapter.class, () -> new HandlerFunctionAdapter());
context.registerBean(HandlerResultHandler.class, () -> new ServerResponseResultHandler());
context.refresh();
return context;
}
@Bean
RouterFunction<ServerResponse> routes(final Flux<ServerSentEvent<Map>> processor,
final Consumer<String> distributeEvent) {
final ParameterizedTypeReference<Map<String, String>> type
= new ParameterizedTypeReference<Map<String, String>>() {};
return
route(GET("/**"),
request -> ok().contentType(request.headers().accept().contains(TEXT_EVENT_STREAM)
? TEXT_EVENT_STREAM : APPLICATION_STREAM_JSON)
.body(processor.map(s -> s), ServerSentEvent.class))
.andRoute(POST("/**"),
request -> accepted().body(request.bodyToMono(type)
.map(map -> map.getOrDefault("message", ""))
.map(String::valueOf)
.map(String::trim)
.filter(s -> s.length() > 0)
.doOnNext(distributeEvent)
.map(m -> format("message '%s' accepted.", m))
.map(message -> singletonMap("response", message))
.subscribeOn(Schedulers.elastic())
.flatMap(Mono::just), Map.class))
;
}
@Bean
public RouterFunction<ServerResponse> testWhenMetricPathIsNotMeet() {
RouterFunction<ServerResponse> route = RouterFunctions.route(
RequestPredicates.path("/actuator/metrics/gateway.requests"),
request -> ServerResponse.ok().body(BodyInserters
.fromValue(HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS)));
return route;
}
@Bean
RouterFunction<ServerResponse> routes(final UserHandlers handlers) {
return route(GET("/api/v1/users/**"),
handlers::streamUsers)
.andRoute(POST("/api/v1/users/**"),
handlers::saveUser)
;
}
@Bean
public RouterFunction<ServerResponse> loginServiceBox() {
return route(GET("/listFluxLogins"), loginHandler::loginDetailsList)
.andRoute(GET("/selectLoginById/{id}"), loginHandler::loginDetailsById)
.andRoute(POST("/selectFluxLogins"), loginHandler::chooseFluxLoginDetails)
.andRoute(POST("/saveLogin"), loginHandler::saveLogindetailsMono)
.andRoute(GET("/totalLogins"), loginHandler::countLogins);
}