下面列出了怎么用org.springframework.http.server.reactive.ServerHttpRequestDecorator的API类实例代码及写法,或者点击链接到github查看源代码。
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
/**
* 输出请求体
*
* @param exchange ServerWebExchange
* @param chain GatewayFilterChain
* @param log 日志DTO
* @return Mono
*/
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, LogDto log) {
Flux<DataBuffer> dataBufferFlux = exchange.getRequest().getBody();
return DataBufferUtils.join(dataBufferFlux).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
// 重新构造请求
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
}
};
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, HandlerStrategies.withDefaults().messageReaders()).bodyToMono(String.class).doOnNext(body -> {
if (Pattern.matches("^\\[.*]$", body)) {
List<Object> list = Json.toList(body, Object.class);
log.setBody(list == null ? body : list);
} else if (Pattern.matches("^\\{.*}$", body)) {
Map obj = Json.toMap(body);
log.setBody(obj == null ? body : obj);
} else {
log.setBody(body);
}
logger.info("请求参数:{}", log.toString());
}).then(chain.filter(mutatedExchange));
});
}
/**
* ReadJsonBody
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readBody(ServerWebExchange exchange, WebFilterChain chain, GatewayContext gatewayContext){
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
/*
* read the body Flux<DataBuffer>, and release the buffer
* //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature
* see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095
*/
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
/*
* repackage ServerHttpRequest
*/
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, MESSAGE_READERS)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
gatewayContext.setRequestBody(objectValue);
try {
gatewayContext.getAllRequestData().setAll(JSONObject.parseObject(objectValue, Map.class));
}catch (Exception e){
log.error("[GatewayContext]Read JsonBody error:{}",e);
}
log.debug("[GatewayContext]Read JsonBody Success");
}).then(chain.filter(mutatedExchange));
});
}
private ServerHttpRequestDecorator decorate(final ServerWebExchange exchange, final CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
private static ServerHttpRequest decorate(ServerWebExchange exchange,
DataBuffer dataBuffer, boolean cacheDecoratedRequest) {
if (dataBuffer.readableByteCount() > 0) {
if (log.isTraceEnabled()) {
log.trace("retaining body in exchange attribute");
}
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
}
ServerHttpRequest decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.<DataBuffer>fromSupplier(() -> {
if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR,
null) == null) {
// probably == downstream closed or no body
return null;
}
// TODO: deal with Netty
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory().wrap(pdb.getNativeBuffer().retainedSlice());
}).flux();
}
};
if (cacheDecoratedRequest) {
exchange.getAttributes().put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR,
decorator);
}
return decorator;
}
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
HttpHeaders headers = serverWebExchange.getRequest().getHeaders();
boolean debugCount = headers.containsKey("x-mid") && headers.containsKey("x-debug");
if (debugCount) {
log.info("On received x-mid {}, contentLength {}, path {}", headers.get("x-mid"), serverWebExchange.getRequest().getHeaders().getContentLength(), serverWebExchange.getRequest().getQueryParams());
}
if (Optional.ofNullable(serverWebExchange.getRequest().getHeaders().get("Content-Encoding"))
.orElse(Collections.emptyList()).stream()
.anyMatch(s -> s.toLowerCase().contains("gzip"))) {
// 解压body,并重新设置content-length
if (debugCount) {
log.info("header {}", serverWebExchange.getRequest().getHeaders());
}
return serverWebExchange.getRequest().getBody()
.collectList()
.map(dataBuffers -> {
Assert.notEmpty(dataBuffers, "DataBuffers");
if (debugCount) {
log.info("On body input x-mid {} , dataBuffers {}", headers.get("x-mid"), dataBuffers.size());
}
return dataBuffers.get(0).factory().join(dataBuffers);
})
.flatMap(dataBuffer -> {
try (GZIPInputStream gzipInputStream = new GZIPInputStream(dataBuffer.asInputStream(true))) {
byte[] byteBody = ByteStreams.toByteArray(gzipInputStream);
if (debugCount) {
log.info("On body input x-mid {}, byteBody {}", headers.get("x-mid"), byteBody.length);
}
return webFilterChain.filter(serverWebExchange.mutate()
.request(new ServerHttpRequestDecorator(serverWebExchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Flux.just(dataBuffer.factory().wrap(byteBody));
}
@Override
public HttpHeaders getHeaders() {
long contentLength = super.getHeaders().getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
// 由于修改了传递参数,需要重新设置CONTENT_LENGTH,长度是字节长度,不是字符串长度
httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
if (contentLength > 0) {
httpHeaders.setContentLength(byteBody.length);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' on httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
})
.build());
} catch (IOException e) {
log.error(e.getMessage(), e);
return Mono.error(e);
}
})
.then();
}
return webFilterChain.filter(serverWebExchange);
}
/**
* ReadFormData
* @param exchange
* @param chain
* @return
*/
private Mono<Void> readFormData(ServerWebExchange exchange,WebFilterChain chain,GatewayContext gatewayContext){
HttpHeaders headers = exchange.getRequest().getHeaders();
return exchange.getFormData()
.doOnNext(multiValueMap -> {
gatewayContext.setFormData(multiValueMap);
gatewayContext.getAllRequestData().addAll(multiValueMap);
log.debug("[GatewayContext]Read FormData Success");
})
.then(Mono.defer(() -> {
Charset charset = headers.getContentType().getCharset();
charset = charset == null? StandardCharsets.UTF_8:charset;
String charsetName = charset.name();
MultiValueMap<String, String> formData = gatewayContext.getFormData();
/*
* formData is empty just return
*/
if(null == formData || formData.isEmpty()){
return chain.filter(exchange);
}
StringBuilder formDataBodyBuilder = new StringBuilder();
String entryKey;
List<String> entryValue;
try {
/*
* repackage form data
*/
for (Map.Entry<String, List<String>> entry : formData.entrySet()) {
entryKey = entry.getKey();
entryValue = entry.getValue();
if (entryValue.size() > 1) {
for(String value : entryValue){
formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(value, charsetName)).append("&");
}
} else {
formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(entryValue.get(0), charsetName)).append("&");
}
}
}catch (UnsupportedEncodingException e){}
/*
* substring with the last char '&'
*/
String formDataBodyString = "";
if(formDataBodyBuilder.length()>0){
formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1);
}
/*
* get data bytes
*/
byte[] bodyBytes = formDataBodyString.getBytes(charset);
int contentLength = bodyBytes.length;
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
/*
* in case of content-length not matched
*/
httpHeaders.setContentLength(contentLength);
/*
* use BodyInserter to InsertFormData Body
*/
BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString);
CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders);
log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString);
return bodyInserter.insert(cachedBodyOutputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return cachedBodyOutputMessage.getBody();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}));
}));
}