类org.springframework.http.server.reactive.ServerHttpRequestDecorator源码实例Demo

下面列出了怎么用org.springframework.http.server.reactive.ServerHttpRequestDecorator的API类实例代码及写法,或者点击链接到github查看源代码。

ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
		CachedBodyOutputMessage outputMessage) {
	return new ServerHttpRequestDecorator(exchange.getRequest()) {
		@Override
		public HttpHeaders getHeaders() {
			long contentLength = headers.getContentLength();
			HttpHeaders httpHeaders = new HttpHeaders();
			httpHeaders.putAll(headers);
			if (contentLength > 0) {
				httpHeaders.setContentLength(contentLength);
			}
			else {
				// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
				// httpbin.org
				httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
			}
			return httpHeaders;
		}

		@Override
		public Flux<DataBuffer> getBody() {
			return outputMessage.getBody();
		}
	};
}
 
源代码2 项目: gateway   文件: LogFilter.java
/**
 * 输出请求体
 *
 * @param exchange ServerWebExchange
 * @param chain    GatewayFilterChain
 * @param log      日志DTO
 * @return Mono
 */
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, LogDto log) {
    Flux<DataBuffer> dataBufferFlux = exchange.getRequest().getBody();
    return DataBufferUtils.join(dataBufferFlux).flatMap(dataBuffer -> {
        byte[] bytes = new byte[dataBuffer.readableByteCount()];
        dataBuffer.read(bytes);
        DataBufferUtils.release(dataBuffer);

        // 重新构造请求
        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public Flux<DataBuffer> getBody() {
                return Flux.defer(() -> {
                    DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                    DataBufferUtils.retain(buffer);

                    return Mono.just(buffer);
                });
            }
        };

        ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
        return ServerRequest.create(mutatedExchange, HandlerStrategies.withDefaults().messageReaders()).bodyToMono(String.class).doOnNext(body -> {
            if (Pattern.matches("^\\[.*]$", body)) {
                List<Object> list = Json.toList(body, Object.class);
                log.setBody(list == null ? body : list);
            } else if (Pattern.matches("^\\{.*}$", body)) {
                Map obj = Json.toMap(body);
                log.setBody(obj == null ? body : obj);
            } else {
                log.setBody(body);
            }

            logger.info("请求参数:{}", log.toString());
        }).then(chain.filter(mutatedExchange));
    });
}
 
源代码3 项目: open-cloud   文件: GatewayContextFilter.java
/**
 * ReadJsonBody
 * @param exchange
 * @param chain
 * @return
 */
private Mono<Void> readBody(ServerWebExchange exchange, WebFilterChain chain, GatewayContext gatewayContext){
    return DataBufferUtils.join(exchange.getRequest().getBody())
            .flatMap(dataBuffer -> {
                /*
                 * read the body Flux<DataBuffer>, and release the buffer
                 * //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature
                 * see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095
                 */
                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                dataBuffer.read(bytes);
                DataBufferUtils.release(dataBuffer);
                Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
                    DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                    DataBufferUtils.retain(buffer);
                    return Mono.just(buffer);
                });
                /*
                 * repackage ServerHttpRequest
                 */
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return cachedFlux;
                    }
                };
                ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
                return ServerRequest.create(mutatedExchange, MESSAGE_READERS)
                        .bodyToMono(String.class)
                        .doOnNext(objectValue -> {
                            gatewayContext.setRequestBody(objectValue);
                            try {
                                gatewayContext.getAllRequestData().setAll(JSONObject.parseObject(objectValue, Map.class));
                            }catch (Exception e){
                                log.error("[GatewayContext]Read JsonBody error:{}",e);
                            }
                            log.debug("[GatewayContext]Read JsonBody Success");
                        }).then(chain.filter(mutatedExchange));
            });
}
 
源代码4 项目: soul   文件: FileSizeFilter.java
private ServerHttpRequestDecorator decorate(final ServerWebExchange exchange, final CachedBodyOutputMessage outputMessage) {
    return new ServerHttpRequestDecorator(exchange.getRequest()) {
        @Override
        public Flux<DataBuffer> getBody() {
            return outputMessage.getBody();
        }
    };
}
 
private static ServerHttpRequest decorate(ServerWebExchange exchange,
		DataBuffer dataBuffer, boolean cacheDecoratedRequest) {
	if (dataBuffer.readableByteCount() > 0) {
		if (log.isTraceEnabled()) {
			log.trace("retaining body in exchange attribute");
		}
		exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
	}

	ServerHttpRequest decorator = new ServerHttpRequestDecorator(
			exchange.getRequest()) {
		@Override
		public Flux<DataBuffer> getBody() {
			return Mono.<DataBuffer>fromSupplier(() -> {
				if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR,
						null) == null) {
					// probably == downstream closed or no body
					return null;
				}
				// TODO: deal with Netty
				NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
				return pdb.factory().wrap(pdb.getNativeBuffer().retainedSlice());
			}).flux();
		}
	};
	if (cacheDecoratedRequest) {
		exchange.getAttributes().put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR,
				decorator);
	}
	return decorator;
}
 
源代码6 项目: influx-proxy   文件: GzipWebFilter.java
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {

    HttpHeaders headers = serverWebExchange.getRequest().getHeaders();
    boolean debugCount = headers.containsKey("x-mid") && headers.containsKey("x-debug");
    if (debugCount) {
        log.info("On received x-mid {}, contentLength {}, path {}", headers.get("x-mid"), serverWebExchange.getRequest().getHeaders().getContentLength(), serverWebExchange.getRequest().getQueryParams());
    }
    if (Optional.ofNullable(serverWebExchange.getRequest().getHeaders().get("Content-Encoding"))
            .orElse(Collections.emptyList()).stream()
            .anyMatch(s -> s.toLowerCase().contains("gzip"))) {
        // 解压body,并重新设置content-length

        if (debugCount) {
            log.info("header {}", serverWebExchange.getRequest().getHeaders());
        }
        return serverWebExchange.getRequest().getBody()
                .collectList()
                .map(dataBuffers -> {
                    Assert.notEmpty(dataBuffers, "DataBuffers");
                    if (debugCount) {
                        log.info("On body input x-mid {} , dataBuffers {}", headers.get("x-mid"), dataBuffers.size());
                    }
                    return dataBuffers.get(0).factory().join(dataBuffers);
                })
                .flatMap(dataBuffer -> {
                    try (GZIPInputStream gzipInputStream = new GZIPInputStream(dataBuffer.asInputStream(true))) {
                        byte[] byteBody = ByteStreams.toByteArray(gzipInputStream);
                        if (debugCount) {
                            log.info("On body input x-mid {}, byteBody {}", headers.get("x-mid"), byteBody.length);
                        }
                        return webFilterChain.filter(serverWebExchange.mutate()
                                .request(new ServerHttpRequestDecorator(serverWebExchange.getRequest()) {
                                    @Override
                                    public Flux<DataBuffer> getBody() {
                                        return Flux.just(dataBuffer.factory().wrap(byteBody));
                                    }

                                    @Override
                                    public HttpHeaders getHeaders() {
                                        long contentLength = super.getHeaders().getContentLength();
                                        HttpHeaders httpHeaders = new HttpHeaders();
                                        httpHeaders.putAll(super.getHeaders());
                                        // 由于修改了传递参数,需要重新设置CONTENT_LENGTH,长度是字节长度,不是字符串长度
                                        httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
                                        if (contentLength > 0) {
                                            httpHeaders.setContentLength(byteBody.length);
                                        } else {
                                            // TODO: this causes a 'HTTP/1.1 411 Length Required' on httpbin.org
                                            httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                                        }
                                        return httpHeaders;
                                    }
                                })
                                .build());
                    } catch (IOException e) {
                        log.error(e.getMessage(), e);
                        return Mono.error(e);
                    }
                })
                .then();
    }
    return webFilterChain.filter(serverWebExchange);
}
 
源代码7 项目: open-cloud   文件: GatewayContextFilter.java
/**
 * ReadFormData
 * @param exchange
 * @param chain
 * @return
 */
private Mono<Void> readFormData(ServerWebExchange exchange,WebFilterChain chain,GatewayContext gatewayContext){
    HttpHeaders headers = exchange.getRequest().getHeaders();
    return exchange.getFormData()
            .doOnNext(multiValueMap -> {
                gatewayContext.setFormData(multiValueMap);
                gatewayContext.getAllRequestData().addAll(multiValueMap);
                log.debug("[GatewayContext]Read FormData Success");
            })
            .then(Mono.defer(() -> {
                Charset charset = headers.getContentType().getCharset();
                charset = charset == null? StandardCharsets.UTF_8:charset;
                String charsetName = charset.name();
                MultiValueMap<String, String> formData = gatewayContext.getFormData();
                /*
                 * formData is empty just return
                 */
                if(null == formData || formData.isEmpty()){
                    return chain.filter(exchange);
                }
                StringBuilder formDataBodyBuilder = new StringBuilder();
                String entryKey;
                List<String> entryValue;
                try {
                    /*
                     * repackage form data
                     */
                    for (Map.Entry<String, List<String>> entry : formData.entrySet()) {
                        entryKey = entry.getKey();
                        entryValue = entry.getValue();
                        if (entryValue.size() > 1) {
                            for(String value : entryValue){
                                formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(value, charsetName)).append("&");
                            }
                        } else {
                            formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(entryValue.get(0), charsetName)).append("&");
                        }
                    }
                }catch (UnsupportedEncodingException e){}
                /*
                 * substring with the last char '&'
                 */
                String formDataBodyString = "";
                if(formDataBodyBuilder.length()>0){
                    formDataBodyString = formDataBodyBuilder.substring(0, formDataBodyBuilder.length() - 1);
                }
                /*
                 * get data bytes
                 */
                byte[] bodyBytes =  formDataBodyString.getBytes(charset);
                int contentLength = bodyBytes.length;
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(exchange.getRequest().getHeaders());
                httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
                /*
                 * in case of content-length not matched
                 */
                httpHeaders.setContentLength(contentLength);
                /*
                 * use BodyInserter to InsertFormData Body
                 */
                BodyInserter<String, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromObject(formDataBodyString);
                CachedBodyOutputMessage cachedBodyOutputMessage = new CachedBodyOutputMessage(exchange, httpHeaders);
                log.debug("[GatewayContext]Rewrite Form Data :{}",formDataBodyString);
                return bodyInserter.insert(cachedBodyOutputMessage,  new BodyInserterContext())
                        .then(Mono.defer(() -> {
                            ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
                                    exchange.getRequest()) {
                                @Override
                                public HttpHeaders getHeaders() {
                                    return httpHeaders;
                                }
                                @Override
                                public Flux<DataBuffer> getBody() {
                                    return cachedBodyOutputMessage.getBody();
                                }
                            };
                            return chain.filter(exchange.mutate().request(decorator).build());
                        }));
            }));
}
 
 类所在包
 同包方法