下面列出了怎么用org.springframework.http.server.reactive.ServerHttpResponse的API类实例代码及写法,或者点击链接到github查看源代码。
private void setResponseStatus(HttpClientResponse clientResponse,
ServerHttpResponse response) {
HttpStatus status = HttpStatus.resolve(clientResponse.status().code());
if (status != null) {
response.setStatusCode(status);
}
else {
while (response instanceof ServerHttpResponseDecorator) {
response = ((ServerHttpResponseDecorator) response).getDelegate();
}
if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(clientResponse.status().code());
}
else {
// TODO: log warning here, not throw error?
throw new IllegalStateException("Unable to set status code "
+ clientResponse.status().code() + " on response of type "
+ response.getClass().getName());
}
}
}
@DataProvider(value = {
"true",
"false"
})
@Test
public void Builder_withTagAndNamingStrategy_works_as_expected(boolean valueIsNull) {
// given
WingtipsSpringWebfluxWebFilter.Builder origBuilder = new WingtipsSpringWebfluxWebFilter.Builder();
HttpTagAndSpanNamingStrategy<ServerWebExchange, ServerHttpResponse> expectedValue =
(valueIsNull) ? null : tagAndNamingStrategy;
// when
WingtipsSpringWebfluxWebFilter.Builder result = origBuilder.withTagAndNamingStrategy(expectedValue);
// then
assertThat(result).isSameAs(origBuilder);
assertThat(origBuilder.tagAndNamingStrategy).isEqualTo(expectedValue);
}
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
});
}
@Bean
public WebFilter corsFilter() {
return (ServerWebExchange ctx, WebFilterChain chain) -> {
ServerHttpRequest request = ctx.getRequest();
if (CorsUtils.isCorsRequest(request)) {
HttpHeaders requestHeaders = request.getHeaders();
ServerHttpResponse response = ctx.getResponse();
HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
HttpHeaders headers = response.getHeaders();
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestHeaders
.getAccessControlRequestHeaders());
if(requestMethod != null){
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
}
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "*");
headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(ctx);
};
}
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse,
CachedBodyOutputMessage message, Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}
List<String> encodingHeaders = httpResponse.getHeaders()
.getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel())
.map(encoder::encode).map(dataBufferFactory::wrap);
break;
}
}
return response;
}
@RequestMapping(value = "/graphql", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
@CrossOrigin
public Mono<Void> graphql(@RequestBody Map<String, Object> body, ServerHttpResponse serverHttpResponse) throws IOException {
String query = (String) body.get("query");
if (query == null) {
query = "";
}
Map<String, Object> variables = (Map<String, Object>) body.get("variables");
if (variables == null) {
variables = new LinkedHashMap<>();
}
ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(query)
.variables(variables)
.build();
ExecutionResult executionResult = graphql.execute(executionInput);
Map<Object, Object> extensions = executionResult.getExtensions();
if (extensions != null && extensions.containsKey(GraphQL.DEFERRED_RESULTS)) {
return handleDeferResponse(serverHttpResponse, executionResult, extensions);
} else {
return handleNormalResponse(serverHttpResponse, executionResult);
}
}
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, GatewayFilterChain chain) {
ServerHttpRequest request = serverWebExchange.getRequest();
ServerHttpResponse response = serverWebExchange.getResponse();
HttpHeaders headers = response.getHeaders();
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "POST, GET, PUT, OPTIONS, DELETE, PATCH");
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "*");
headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, ALL);
headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
return chain.filter(serverWebExchange);
}
@Override
public Mono<Void> render(@Nullable Map<String, ?> model, @Nullable MediaType contentType,
ServerWebExchange exchange) {
StringBuilder builder = new StringBuilder();
builder.append("name=").append(this.name).append('\n');
for (Map.Entry<String, ?> entry : model.entrySet()) {
builder.append(entry.getKey()).append('=').append(entry.getValue()).append('\n');
}
builder.setLength(builder.length() - 1);
byte[] bytes = builder.toString().getBytes(StandardCharsets.UTF_8);
ServerHttpResponse response = exchange.getResponse();
DataBuffer buffer = response.bufferFactory().wrap(bytes);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
return response.writeWith(Mono.just(buffer));
}
@Override
@SuppressWarnings("all")
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
if (CorsUtils.isCorsRequest(request)) {
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = response.getHeaders();
headers.add("Access-Control-Allow-Origin", "*");
headers.add("Access-Control-Allow-Methods", "*");
headers.add("Access-Control-Max-Age", "3600");
headers.add("Access-Control-Allow-Headers", "*");
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(exchange);
}
@UseDataProvider("extractTagAndNamingAdapterScenarioDataProvider")
@Test
public void extractTagAndNamingAdapter_works_as_expected(
ExtractTagAndNamingAdapterScenario scenario
) {
// given
WingtipsSpringBoot2WebfluxProperties props = generateProps(
false, null, null, null, scenario.adapterName, false
);
WingtipsSpringBoot2WebfluxConfiguration conf = new WingtipsSpringBoot2WebfluxConfiguration(props);
// when
HttpTagAndSpanNamingAdapter<ServerWebExchange, ServerHttpResponse> result =
conf.extractTagAndNamingAdapter(props);
// then
if (scenario.expectedResultClass == null) {
assertThat(result).isNull();
} else {
assertThat(result).isNotNull();
assertThat(result.getClass()).isEqualTo(scenario.expectedResultClass);
}
}
@Bean
public WebFilter corsFilter() {
return (ServerWebExchange ctx, WebFilterChain chain) -> {
ServerHttpRequest request = ctx.getRequest();
if (CorsUtils.isCorsRequest(request)) {
ServerHttpResponse response = ctx.getResponse();
HttpHeaders headers = response.getHeaders();
headers.add("Access-Control-Allow-Origin", ALLOWED_ORIGIN);
headers.add("Access-Control-Allow-Methods", ALLOWED_METHODS);
headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS);
headers.add("Access-Control-Expose-Headers", ALLOWED_EXPOSE);
headers.add("Access-Control-Allow-Credentials", "true");
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(ctx);
};
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
RequestMappingConfiguration configuration = requestMappingResolver.resolveRequestMapping(request);
if (configuration == null) {
return chain.filter(exchange);
}
WebClient webClient = webClientProvider.getWebClient(configuration);
return webClient.method(httpRequestMapper.extractMethod(request))
.uri(httpRequestMapper.extractUri(request))
.headers(headers -> headers.putAll(httpRequestMapper.extractHeaders(request)))
.body(httpRequestMapper.extractBody(request))
.exchange()
.flatMap(clientResponse -> httpResponseMapper.map(clientResponse, response));
}
@Test
public void actualRequestExposedHeaders() throws Exception {
ServerWebExchange exchange = actualRequest();
this.conf.addExposedHeader("header1");
this.conf.addExposedHeader("header2");
this.conf.addAllowedOrigin("http://domain2.com");
this.processor.process(this.conf, exchange);
ServerHttpResponse response = exchange.getResponse();
assertTrue(response.getHeaders().containsKey(ACCESS_CONTROL_ALLOW_ORIGIN));
assertEquals("http://domain2.com", response.getHeaders().getFirst(ACCESS_CONTROL_ALLOW_ORIGIN));
assertTrue(response.getHeaders().containsKey(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS));
assertTrue(response.getHeaders().getFirst(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS).contains("header1"));
assertTrue(response.getHeaders().getFirst(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS).contains("header2"));
assertThat(response.getHeaders().get(VARY), contains(ORIGIN,
ACCESS_CONTROL_REQUEST_METHOD, ACCESS_CONTROL_REQUEST_HEADERS));
assertNull(response.getStatusCode());
}
@GetMapping("/export.{format}")
@QueryAction
public Mono<Void> export(ServerHttpResponse response,
QueryParamEntity parameter,
@PathVariable String format) throws IOException {
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=".concat(URLEncoder.encode("设备实例." + format, StandardCharsets.UTF_8.displayName())));
return ReactorExcel.<DeviceExcelInfo>writer(format)
.headers(DeviceExcelInfo.getExportHeaderMapping(Collections.emptyList()))
.converter(DeviceExcelInfo::toMap)
.writeBuffer(
service.query(parameter)
.map(entity -> FastBeanCopier.copy(entity, new DeviceExcelInfo()))
, 512 * 1024)//缓冲512k
.doOnError(err -> log.error(err.getMessage(), err))
.map(bufferFactory::wrap)
.as(response::writeWith);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.map(dataBuffer -> {
// probably should reuse buffers
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
//释放掉内存
DataBufferUtils.release(dataBuffer);
return bufferFactory.wrap(content);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build()).then(Mono.fromRunnable(()->{
accessLogService.sendLog(exchange, null);
}));
}
private Mono<Void> setRateCheckResponse(ServerWebExchange exchange) {
//超过了限流
ServerHttpResponse response = exchange.getResponse();
//设置headers
HttpHeaders httpHeaders = response.getHeaders();
httpHeaders.add("Content-Type", "application/json; charset=UTF-8");
httpHeaders.add("Access-Control-Allow-Origin", CrossDomainConfiguration.ALLOWED_ORIGIN);
httpHeaders.add("Access-Control-Allow-Methods", CrossDomainConfiguration.ALLOWED_METHODS);
httpHeaders.add("Access-Control-Max-Age", CrossDomainConfiguration.MAX_AGE);
httpHeaders.add("Access-Control-Allow-Headers", CrossDomainConfiguration.ALLOWED_HEADERS);
httpHeaders.add("Access-Control-Expose-Headers", CrossDomainConfiguration.ALLOWED_Expose);
httpHeaders.add("Access-Control-Allow-Credentials", "true");
httpHeaders.add("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0");
//设置body
JSONResult jsonResult = JSONResult.build(HttpStatus.TOO_MANY_REQUESTS.value(), "当前访问人数过多,请稍后重试", "当前访问人数过多,请稍后重试");
DataBuffer bodyDataBuffer = response.bufferFactory().wrap(JSONObject.toJSONString(jsonResult).getBytes());
log.error("限流了================");
return response.writeWith(Mono.just(bodyDataBuffer));
}
@Test
public void preflightRequestCredentials() throws Exception {
ServerWebExchange exchange = MockServerWebExchange.from(preFlightRequest()
.header(ACCESS_CONTROL_REQUEST_METHOD, "GET")
.header(ACCESS_CONTROL_REQUEST_HEADERS, "Header1"));
this.conf.addAllowedOrigin("http://domain1.com");
this.conf.addAllowedOrigin("http://domain2.com");
this.conf.addAllowedOrigin("http://domain3.com");
this.conf.addAllowedHeader("Header1");
this.conf.setAllowCredentials(true);
this.processor.process(this.conf, exchange);
ServerHttpResponse response = exchange.getResponse();
assertTrue(response.getHeaders().containsKey(ACCESS_CONTROL_ALLOW_ORIGIN));
assertEquals("http://domain2.com", response.getHeaders().getFirst(ACCESS_CONTROL_ALLOW_ORIGIN));
assertTrue(response.getHeaders().containsKey(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertEquals("true", response.getHeaders().getFirst(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertThat(response.getHeaders().get(VARY), contains(ORIGIN,
ACCESS_CONTROL_REQUEST_METHOD, ACCESS_CONTROL_REQUEST_HEADERS));
assertNull(response.getStatusCode());
}
@Test
public void actualRequestCredentials() throws Exception {
ServerWebExchange exchange = actualRequest();
this.conf.addAllowedOrigin("https://domain1.com");
this.conf.addAllowedOrigin("https://domain2.com");
this.conf.addAllowedOrigin("http://domain3.com");
this.conf.setAllowCredentials(true);
this.processor.process(this.conf, exchange);
ServerHttpResponse response = exchange.getResponse();
assertTrue(response.getHeaders().containsKey(ACCESS_CONTROL_ALLOW_ORIGIN));
assertEquals("https://domain2.com", response.getHeaders().getFirst(ACCESS_CONTROL_ALLOW_ORIGIN));
assertTrue(response.getHeaders().containsKey(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertEquals("true", response.getHeaders().getFirst(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertThat(response.getHeaders().get(VARY), contains(ORIGIN,
ACCESS_CONTROL_REQUEST_METHOD, ACCESS_CONTROL_REQUEST_HEADERS));
assertNull(response.getStatusCode());
}
@Test
public void actualRequestCredentialsWithOriginWildcard() throws Exception {
ServerWebExchange exchange = actualRequest();
this.conf.addAllowedOrigin("*");
this.conf.setAllowCredentials(true);
this.processor.process(this.conf, exchange);
ServerHttpResponse response = exchange.getResponse();
assertTrue(response.getHeaders().containsKey(ACCESS_CONTROL_ALLOW_ORIGIN));
assertEquals("https://domain2.com", response.getHeaders().getFirst(ACCESS_CONTROL_ALLOW_ORIGIN));
assertTrue(response.getHeaders().containsKey(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertEquals("true", response.getHeaders().getFirst(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS));
assertThat(response.getHeaders().get(VARY), contains(ORIGIN,
ACCESS_CONTROL_REQUEST_METHOD, ACCESS_CONTROL_REQUEST_HEADERS));
assertNull(response.getStatusCode());
}
/**
* 封装返回值
*
* @param response
* @param result
* @return
*/
private DataBuffer getBodyBuffer(ServerHttpResponse response, JsonData result) {
try {
return response.bufferFactory().wrap(JsonUtil.toJSONBytes(result));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public <T> Mono<T> commence(ServerWebExchange exchange, AuthenticationException e) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().set("WWW-Authenticate", "Basic realm=\"Reactive\"");
return Mono.empty();
}
/**
* Since this class represents server requests/responses (not clients), we only want to consider HTTP status codes
* greater than or equal to 500 to be an error. From a server's perspective, a 4xx response is the correct
* response to a bad request, and should therefore not be considered an error (again, from the server's
* perspective - the client may feel differently).
*
* @param response The response object.
* @return The value of {@link #getResponseHttpStatus(ServerHttpResponse)} if it is greater than or equal to 500,
* or null otherwise.
*/
@Override
public @Nullable String getErrorResponseTagValue(@Nullable ServerHttpResponse response) {
Integer statusCode = getResponseHttpStatus(response);
if (statusCode != null && statusCode >= 500) {
return statusCode.toString();
}
// Status code does not indicate an error, so return null.
return null;
}
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(@NonNull final ServerWebExchange exchange, @NonNull final WebFilterChain chain) {
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)) {
ServerRequest serverRequest = ServerRequest.create(exchange,
messageReaders);
return serverRequest.bodyToMono(DataBuffer.class)
.flatMap(size -> {
if (size.capacity() > BYTES_PER_MB * maxSize) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.BAD_REQUEST);
Object error = SoulResultWarp.error(SoulResultEnum.PAYLOAD_TOO_LARGE.getCode(), SoulResultEnum.PAYLOAD_TOO_LARGE.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
BodyInserter bodyInserter = BodyInserters.fromPublisher(Mono.just(size), DataBuffer.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(
exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, outputMessage);
return chain.filter(exchange.mutate().request(decorator).build());
}));
});
}
return chain.filter(exchange);
}
/**
* Process the web request and validate the API version in the header. If the API version does not match, then set
* an HTTP 412 status and write the error message to the response.
*
* @param exchange {@inheritDoc}
* @param chain {@inheritDoc}
* @return {@inheritDoc}
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
PathPattern p = new PathPatternParser().parse(V2_API_PATH_PATTERN);
Mono<Void> filterMono = chain.filter(exchange);
if (p.matches(exchange.getRequest().getPath()) && version != null && !anyVersionAllowed()) {
String apiVersion = exchange.getRequest().getHeaders().getFirst(version.getBrokerApiVersionHeader());
ServerHttpResponse response = exchange.getResponse();
String message = null;
if (apiVersion == null) {
response.setStatusCode(HttpStatus.BAD_REQUEST);
message = ServiceBrokerApiVersionErrorMessage.from(version.getApiVersion(), "null").toString();
}
else if (!version.getApiVersion().equals(apiVersion)) {
response.setStatusCode(HttpStatus.PRECONDITION_FAILED);
message = ServiceBrokerApiVersionErrorMessage.from(version.getApiVersion(), apiVersion)
.toString();
}
if (message != null) {
String json;
try {
json = new ObjectMapper().writeValueAsString(new ErrorMessage(message));
}
catch (JsonProcessingException e) {
json = "{}";
}
Flux<DataBuffer> responseBody =
Flux.just(json)
.map(s -> toDataBuffer(s, response.bufferFactory()));
filterMono = response.writeWith(responseBody);
}
}
return filterMono;
}
@Bean
public WebFilter corsFilter() {
return (ServerWebExchange ctx, WebFilterChain chain) -> {
ServerHttpRequest request = ctx.getRequest();
if (CorsUtils.isCorsRequest(request)) {
HttpHeaders requestHeaders = request.getHeaders();
ServerHttpResponse response = ctx.getResponse();
HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
HttpHeaders headers = response.getHeaders();
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS,
requestHeaders.getAccessControlRequestHeaders());
if (requestMethod != null) {
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
}
headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "*");
headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
}
return chain.filter(ctx);
};
}
@Override
public @Nullable String getRequestUriPathTemplate(
@Nullable ServerWebExchange exchange,
@Nullable ServerHttpResponse response
) {
if (exchange == null) {
return null;
}
return WingtipsSpringWebfluxWebFilter.determineUriPathTemplate(exchange);
}
private static <T> Mono<Void> write(Publisher<? extends T> input, ResolvableType type,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
BodyInserter.Context context, HttpMessageWriter<T> writer) {
return context.serverRequest()
.map(request -> {
ServerHttpResponse response = (ServerHttpResponse) message;
return writer.write(input, type, type, mediaType, request, response, context.hints());
})
.orElseGet(() -> writer.write(input, type, mediaType, message, context.hints()));
}
private Mono<Void> onError(ServerWebExchange exchange, String err)
{
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add(WWW_AUTH_HEADER, this.formatErrorMsg(err));
return response.setComplete();
}
@Override
public Mono<Void> filterBlackList(ServerWebExchange exchange) {
Stopwatch stopwatch = Stopwatch.createStarted();
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
try {
URI originUri = getGatewayOriginalRequestUrl(exchange);
if (originUri != null) {
String requestIp = FebsUtil.getServerHttpRequestIpAddress(request);
String requestMethod = request.getMethodValue();
AtomicBoolean forbid = new AtomicBoolean(false);
Set<Object> blackList = routeEnhanceCacheService.getBlackList(requestIp);
blackList.addAll(routeEnhanceCacheService.getBlackList());
doBlackListCheck(forbid, blackList, originUri, requestMethod);
log.info("Blacklist verification completed - {}", stopwatch.stop());
if (forbid.get()) {
return FebsUtil.makeWebFluxResponse(response, MediaType.APPLICATION_JSON_VALUE,
HttpStatus.NOT_ACCEPTABLE, new FebsResponse().message("黑名单限制,禁止访问"));
}
} else {
log.info("Request IP not obtained, no blacklist check - {}", stopwatch.stop());
}
} catch (Exception e) {
log.warn("Blacklist verification failed : {} - {}", e.getMessage(), stopwatch.stop());
}
return null;
}
@Override
public void onResponse(final ServerWebExchange exchange, final Span span) {
final ServerHttpResponse response = exchange.getResponse();
final HttpHeaders headers = response.getHeaders();
Optional.ofNullable(headers.getFirst("Retry-After"))
.ifPresent(prefer -> span.setTag(RETRY_AFTER, prefer));
}