下面列出了org.springframework.http.server.reactive.ServerHttpRequestDecorator#org.springframework.web.reactive.function.server.ServerRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, boolean includeStackTrace) {
Map<String, Object> map = super.getErrorAttributes(request, includeStackTrace);
if (getError(request) instanceof GlobalException) {
GlobalException ex = (GlobalException) getError(request);
map.put("exception", ex.getClass().getSimpleName());
map.put("message", ex.getMessage());
map.put("status", ex.getStatus().value());
map.put("error", ex.getStatus().getReasonPhrase());
return map;
}
map.put("exception", "SystemException");
map.put("message", "System Error , Check logs!");
map.put("status", "500");
map.put("error", " System Error ");
return map;
}
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(final Config config) {
return exchange -> {
JsonNode cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody == null) {
return ServerWebExchangeUtils.cacheRequestBodyAndRequest(
exchange, serverHttpRequest -> ServerRequest.create(
exchange.mutate().request(serverHttpRequest).build(), MESSAGE_READERS).
bodyToMono(JsonNode.class).
doOnNext(objectValue -> exchange.getAttributes().
put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)).
map(objectValue -> objectValue.has(config.getData())));
} else {
return Mono.just(cachedBody.has(config.getData()));
}
};
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
MediaType mediaType = request.getHeaders().getContentType();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
return serverRequest.bodyToMono(String.class)
.switchIfEmpty(Mono.defer(() -> Mono.just("")))
.flatMap(body -> {
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
}
return chain.execute(exchange);
});
}
return chain.execute(exchange);
}
Mono<ServerResponse> routerFunctionEndpoint(ServerRequest request) {
HttpHeaders headers = request.headers().asHttpHeaders();
if ("true".equals(headers.getFirst("throw-exception"))) {
sleepThread(SLEEP_TIME_MILLIS);
throw new RuntimeException("Error thrown in routerFunctionEndpoint(), outside Mono");
}
if ("true".equals(headers.getFirst("return-exception-in-mono"))) {
return Mono
.delay(Duration.ofMillis(SLEEP_TIME_MILLIS))
.map(d -> {
throw new RuntimeException("Error thrown in routerFunctionEndpoint(), inside Mono");
});
}
return ServerResponse
.ok()
.syncBody(ROUTER_FUNCTION_ENDPOINT_RESPONSE_PAYLOAD)
.delayElement(Duration.ofMillis(SLEEP_TIME_MILLIS));
}
public Mono<ServerResponse> update(ServerRequest req) {
return Mono
.zip(
(data) -> {
Post p = (Post) data[0];
Post p2 = (Post) data[1];
p.setTitle(p2.getTitle());
p.setContent(p2.getContent());
return p;
},
this.posts.findById(req.pathVariable("id")),
req.bodyToMono(Post.class)
)
.cast(Post.class)
.flatMap(post -> this.posts.save(post))
.flatMap(post -> ServerResponse.noContent().build());
}
/**
* Creates links to subcatalogs based on unique values matched in the database for the selected properties field.
*
* @param request The server request object
* @param collection The collection metadata
* @param values A list of unique values in the database for the selected subcataloged field
*/
public void generatePropertyValueLinks(ServerRequest request, CollectionMetadata collection, List<String> values) {
values.forEach(value -> collection.getLinks().add(
new Link()
.href(appendLinkPath(LinksConfigProps.LINK_PREFIX + request.path(), value))
.type(MediaType.APPLICATION_JSON_VALUE)
.rel("child")));
String self = getSelfString(request);
collection.getLinks().add(ROOT);
collection.getLinks().add(new Link()
.href(self)
.type(MediaType.APPLICATION_JSON_VALUE)
.rel("self"));
collection.getLinks().add(new Link()
.href(appendLinkPath(self, "items"))
.type(StaccatoMediaType.APPLICATION_GEO_JSON_VALUE)
.rel("items"));
collection.getLinks().add(new Link()
.href(self.substring(0, self.lastIndexOf("/")))
.type(MediaType.APPLICATION_JSON_VALUE)
.rel("parent"));
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
if (Objects.nonNull(soulContext) && RpcTypeEnum.DUBBO.getName().equals(soulContext.getRpcType())) {
MediaType mediaType = request.getHeaders().getContentType();
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
return serverRequest.bodyToMono(String.class)
.switchIfEmpty(Mono.defer(() -> Mono.just("")))
.flatMap(body -> {
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
exchange.getAttributes().put(Constants.DUBBO_PARAMS, body);
}
return chain.execute(exchange);
});
}
return chain.execute(exchange);
}
@Override
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
//生成验证码
String text = producer.createText();
BufferedImage image = producer.createImage(text);
//保存验证码信息
String randomStr = serverRequest.queryParam("randomStr").get();
redisTemplate.opsForValue().set(DEFAULT_CODE_KEY + randomStr, text, 120, TimeUnit.SECONDS);
// 转换流信息写出
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
try {
ImageIO.write(image, "jpeg", os);
} catch (IOException e) {
log.error("ImageIO write err", e);
return Mono.error(e);
}
return ServerResponse
.status(HttpStatus.OK)
.contentType(MediaType.IMAGE_JPEG)
.body(BodyInserters.fromResource(new ByteArrayResource(os.toByteArray())));
}
public Mono<ServerResponse> eventStream(final ServerRequest request) {
val id = request.pathVariable("id");
val body = movieService.streamEvents(id);
return ServerResponse.ok()
.contentType(TEXT_EVENT_STREAM)
.body(body, MovieEvent.class);
}
@Test
public void testGetHeader() {
HttpHeaders headers = new HttpHeaders();
headers.add("app", "demo");
ServerRequest req = MockServerRequest.builder().headers(headers).build();
Assert.assertEquals("demo", WebfluxUtils.getHeaderValue(req, "app"));
}
Mono<ServerResponse> routerFunctionEndpoint(ServerRequest request) {
HttpHeaders headers = request.headers().asHttpHeaders();
if ("true".equals(headers.getFirst("throw-exception"))) {
throw new ApiException(ERROR_THROWN_IN_ROUTER_FUNCTION_ENDPOINT);
}
if ("true".equals(headers.getFirst("return-exception-in-mono"))) {
return Mono.error(
new ApiException(ERROR_RETURNED_IN_ROUTER_FUNCTION_ENDPOINT_MONO)
);
}
return ServerResponse.ok().syncBody(ROUTER_FUNCTION_ENDPOINT_RESPONSE_PAYLOAD);
}
public Mono<ServerResponse> updates(ServerRequest request) {
int queries = getQueries(request);
Mono<List<World>> worlds = Flux.range(0, queries)
.flatMap(i -> dbRepository.findAndUpdateWorld(randomWorldNumber(), randomWorldNumber()))
.collectList();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(worlds, new ParameterizedTypeReference<List<World>>() {
});
}
public Mono<ServerResponse> saveUser(final ServerRequest request) {
return ServerResponse.ok()
.contentType(APPLICATION_STREAM_JSON)
.body(request.bodyToFlux(User.class)
.flatMap(users::save), User.class);
}
public Mono<ServerResponse> multipartData(ServerRequest request) {
return request
.body(BodyExtractors.toMultipartData())
.flatMap(map -> {
Map<String, Part> parts = map.toSingleValueMap();
try {
assertEquals(2, parts.size());
assertEquals("foo.txt", ((FilePart) parts.get("fooPart")).filename());
assertEquals("bar", ((FormFieldPart) parts.get("barPart")).value());
}
catch(Exception e) {
return Mono.error(e);
}
return ServerResponse.ok().build();
});
}
public Mono<ServerResponse> handleRequest3(ServerRequest request) {
return
sayHello(request)
.flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.syncBody(s))
.onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " +
e.getMessage())).flatMap(s -> ServerResponse.ok()
.contentType(MediaType.TEXT_PLAIN)
.syncBody(s)));
}
@Override
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
Optional<Object> originalUris = serverRequest.attribute(GATEWAY_ORIGINAL_REQUEST_URL_ATTR);
originalUris.ifPresent(originalUri -> log.error("网关执行请求:{}失败,hystrix服务降级处理", originalUri));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR.value())
.contentType(MediaType.TEXT_PLAIN).body(BodyInserters.fromObject("服务异常"));
}
/**
* Handle the given request.
*
* @param request the request to handler
* @return the response
*/
@Override
public Mono<ServerResponse> handle(ServerRequest request) {
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(SecurityConfigurationBuilder.builder().build()));
}
public Mono<ServerResponse> falback(ServerRequest request) {
final URI uri = request.uri();
final String scheme = uri.getScheme();
final String authority = uri.getAuthority();
final String baseUrl = format("%s://%s", scheme, authority);
final Map<String, String> api = HashMap.of("hey", format("%s/hey", baseUrl),
"hoy", format("%s/hoy", baseUrl),
"collect", format("%s/collect", baseUrl))
.toJavaMap();
return jsonBuilder.body(Mono.just(api), Map.class);
}
/**
* ReadJsonBody
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readBody(ServerWebExchange exchange, WebFilterChain chain, GatewayContext gatewayContext){
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
/*
* read the body Flux<DataBuffer>, and release the buffer
* //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature
* see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095
*/
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
/*
* repackage ServerHttpRequest
*/
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, MESSAGE_READERS)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
gatewayContext.setRequestBody(objectValue);
try {
gatewayContext.getAllRequestData().setAll(JSONObject.parseObject(objectValue, Map.class));
}catch (Exception e){
log.error("[GatewayContext]Read JsonBody error:{}",e);
}
log.debug("[GatewayContext]Read JsonBody Success");
}).then(chain.filter(mutatedExchange));
});
}
public Mono<ServerResponse> eventStream(final ServerRequest request) {
val id = request.pathVariable("id");
val body = movieService.streamEvents(id);
return ServerResponse.ok()
.contentType(TEXT_EVENT_STREAM)
.body(body, MovieEvent.class);
}
public Mono<ServerResponse> collect(ServerRequest request) {
return jsonBuilder.body(
Mono.zip(
webClient.get().uri("/api/v1/hey").retrieve()
.bodyToMono(String.class)
.map(h -> format("HEY response: %s", h)),
webClient.get().uri("/api/v1/hoy").retrieve()
.bodyToMono(String.class)
.map(h -> format("HOY response: %s", h)))
.map(tuple2 -> format("%s - %s", tuple2.getT1(), tuple2.getT2())),
String.class);
}
public Mono<ServerResponse> countDepts(ServerRequest req) {
Mono<Long> count = Flux.fromIterable(departmentServiceImpl.findAllDepts())
.count();
CountDept countDept = new CountDept();
countDept.setCount(count.block());
Mono<CountDept> monoCntDept = Mono.justOrEmpty(countDept);
return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(monoCntDept, CountDept.class)
.switchIfEmpty(ServerResponse.notFound().build());
}
@Test
void buildResponseTest() {
final ServerRequest serverRequest = mock(ServerRequest.class);
when(serverRequest.pathVariable(ADDRESS_VARIABLE)).thenReturn(GOOGLE_ADDRESS);
doReturn(GOOGLE_LOCATION).when(geoLocationService).fromAddress(any());
doReturn(SUNRISE_SUNSET).when(sunriseSunsetService).fromGeographicCoordinates(any());
Mono.just(GOOGLE_ADDRESS).transform(apiHandler::buildResponse).subscribe(this::verifyServerResponse);
reset(geoLocationService);
reset(sunriseSunsetService);
}
public Mono<ServerResponse> configs(ServerRequest request) {
String appId = request.pathVariable("id");
PregelGraphAlgorithm<?, ?, ?, ?> algorithm = algorithms.get(appId);
if (algorithm == null) {
return ServerResponse.notFound().build();
}
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(algorithm.configs()), Map.class);
}
/**
* GET ALL Users
*/
public Mono<ServerResponse> getAll(ServerRequest request) {
// fetch all customers from repository
Flux<User> customers = customerRepository.getAllUsers();
// build response
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(customers, User.class);
}
/**
* PUT a User
*/
public Mono<ServerResponse> putUser(ServerRequest request) {
// parse id from path-variable
long customerId = Long.valueOf(request.pathVariable("id"));
// get customer data from request object
Mono<User> customer = request.bodyToMono(User.class);
// get customer from repository
Mono<User> responseMono = customerRepository.putUser(customerId, customer);
// build response
return responseMono
.flatMap(cust -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(cust)));
}
@Override
public Mono<ServerResponse> handle(ServerRequest serverRequest) {
Optional<Object> originalUris = serverRequest.attribute(GATEWAY_ORIGINAL_REQUEST_URL_ATTR);
originalUris.ifPresent(originalUri -> log.error("网关执行请求:{}失败,hystrix服务降级处理", originalUri));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR.value())
.header("Content-Type","text/plain; charset=utf-8").body(BodyInserters.fromObject("服务异常"));
}
@Override
public Map<String, Object> getErrorAttributes(ServerRequest request, boolean includeStackTrace) {
Map<String, Object> map = super.getErrorAttributes(request, includeStackTrace);
map.put("status", getStatus());
map.put("message", getMessage());
return map;
}
public Mono<ServerResponse> countDepts(ServerRequest req) {
Mono<Long> count = Flux.fromIterable(departmentServiceImpl.findAllDepts())
.count();
CountDept countDept = new CountDept();
countDept.setCount(count.block());
Mono<CountDept> monoCntDept = Mono.justOrEmpty(countDept);
return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(monoCntDept, CountDept.class)
.switchIfEmpty(ServerResponse.notFound().build());
}
private Mono<ServerResponse> readBody(ServerRequest request) {
return ServerResponse
.accepted()
.body(request
.bodyToMono(Person.class)
.map(x -> new Confirmation(UUID.randomUUID())), Confirmation.class
);
}