下面列出了org.springframework.http.codec.multipart.FormFieldPart#org.springframework.web.reactive.function.BodyExtractors 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void limitResponseSize() {
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
DataBuffer b1 = dataBuffer("foo", bufferFactory);
DataBuffer b2 = dataBuffer("bar", bufferFactory);
DataBuffer b3 = dataBuffer("baz", bufferFactory);
ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();
Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
.filter(request, req -> Mono.just(response));
StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
.expectComplete()
.verify();
}
@RequestMapping(path = REQUEST_MAPPING_PATH, method = {RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST, RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS})
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId,
ServerHttpRequest request,
ServerHttpResponse response) {
String endpointLocalPath = getEndpointLocalPath(request.getPath().pathWithinApplication().value());
URI uri = UriComponentsBuilder.fromPath(endpointLocalPath)
.query(request.getURI().getRawQuery())
.build(true)
.toUri();
return super.forward(instanceId, uri, request.getMethod(), request.getHeaders(),
() -> BodyInserters.fromDataBuffers(request.getBody())).flatMap(clientResponse -> {
response.setStatusCode(clientResponse.statusCode());
response.getHeaders().addAll(filterHeaders(clientResponse.headers().asHttpHeaders()));
return response.writeAndFlushWith(clientResponse.body(BodyExtractors.toDataBuffers()).window(1));
});
}
@Test
public void should_convert_v1_actuator() {
ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test")
.build();
ClientResponse response = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, ActuatorMediaType.V1_JSON)
.body(Flux.just(ORIGINAL))
.build();
Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));
StepVerifier.create(convertedResponse)
.assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
.expectNext(CONVERTED)
.verifyComplete())
.verifyComplete();
}
@Test
public void should_convert_json() {
ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test")
.build();
ClientResponse response = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.body(Flux.just(ORIGINAL))
.build();
Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));
StepVerifier.create(convertedResponse)
.assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
.expectNext(CONVERTED)
.verifyComplete())
.verifyComplete();
}
@Test
public void should_not_convert_v2_actuator() {
ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test")
.build();
ClientResponse response = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, ActuatorMediaType.V2_JSON)
.body(Flux.just(ORIGINAL))
.build();
Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));
StepVerifier.create(convertedResponse)
.assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
.expectNext(ORIGINAL)
.verifyComplete())
.verifyComplete();
}
@Bean
@RouterOperations({ @RouterOperation(path = "/employees-composed/update", beanClass = EmployeeRepository.class, beanMethod = "updateEmployee"),
@RouterOperation(path = "/employees-composed/{id}", beanClass = EmployeeRepository.class, beanMethod = "findEmployeeById"),
@RouterOperation(path = "/employees-composed", beanClass = EmployeeRepository.class, beanMethod = "findAllEmployees") })
RouterFunction<ServerResponse> composedRoutes() {
return
route(GET("/employees-composed"),
req -> ok().body(
employeeRepository().findAllEmployees(), Employee.class))
.and(route(GET("/employees-composed/{id}"),
req -> ok().body(
employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)))
.and(route(POST("/employees-composed/update"),
req -> req.body(BodyExtractors.toMono(Employee.class))
.doOnNext(employeeRepository()::updateEmployee)
.then(ok().build())));
}
@Test
public void limitResponseSize() {
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
DataBuffer b1 = dataBuffer("foo", bufferFactory);
DataBuffer b2 = dataBuffer("bar", bufferFactory);
DataBuffer b3 = dataBuffer("baz", bufferFactory);
ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();
Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
.filter(request, req -> Mono.just(response));
StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
.expectComplete()
.verify();
}
@Override
protected Mono<Void> doExecute(ServerWebExchange exchange, PipeChain chain, RouteDefinition routeDefinition) {
return chain.execute(exchange).then(Mono.defer(() -> {
ServerHttpResponse response = exchange.getResponse();
final String resultType = exchange.getAttribute(GatewayConstant.RESPONSE_RESULT_TYPE);
RpcTypeEnum rpcTypeEnum = RpcTypeEnum.acquireByName(routeDefinition.getRpcType());
if(RpcTypeEnum.SPRING_CLOUD.getName().equals(rpcTypeEnum.getName())){
ClientResponse clientResponse = exchange.getAttribute(GatewayConstant.HTTP_RPC_RESULT);
if (Objects.isNull(clientResponse)) {
return jsonResult(exchange,JsonResult.error("服务端无响应."));
}
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}else if(RpcTypeEnum.DUBBO.getName().equals(rpcTypeEnum.getName())){
if(StringUtils.equalsIgnoreCase(resultType,ResultEnum.SUCCESS.getName())){
final Object result = exchange.getAttribute(GatewayConstant.DUBBO_RPC_RESULT);
return jsonResult(exchange,JsonResult.success(result));
}else {
final String message = exchange.getAttribute(GatewayConstant.DUBBO_ERROR_MESSAGE);
return jsonResult(exchange,JsonResult.error(message));
}
}
return Mono.empty();
}));
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
ClientResponse clientResponse = exchange
.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse == null) {
return Mono.empty();
}
log.trace("WebClientWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();
return response
.writeWith(
clientResponse.body(BodyExtractors.toDataBuffers()))
// .log("webClient response")
.doOnCancel(() -> cleanup(exchange));
}));
}
@RequestMapping(path = INSTANCE_MAPPED_PATH, method = { RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST,
RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS })
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId, ServerHttpRequest request,
ServerHttpResponse response) {
String endpointLocalPath = this.getEndpointLocalPath(this.adminContextPath + INSTANCE_MAPPED_PATH, request);
URI uri = UriComponentsBuilder.fromPath(endpointLocalPath).query(request.getURI().getRawQuery()).build(true)
.toUri();
return this.instanceWebProxy.forward(this.registry.getInstance(InstanceId.of(instanceId)), uri,
request.getMethod(), this.httpHeadersFilter.filterHeaders(request.getHeaders()),
BodyInserters.fromDataBuffers(request.getBody())).flatMap((clientResponse) -> {
response.setStatusCode(clientResponse.statusCode());
response.getHeaders()
.addAll(this.httpHeadersFilter.filterHeaders(clientResponse.headers().asHttpHeaders()));
return response.writeAndFlushWith(clientResponse.body(BodyExtractors.toDataBuffers()).window(1));
});
}
@Test
void should_convert_v1_actuator() {
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test").build();
@SuppressWarnings("deprecation")
ClientResponse legacyResponse = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, ACTUATOR_V1_MEDIATYPE.toString())
.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
.body(Flux.just(this.original)).build();
Mono<ClientResponse> response = this.filter.filter(INSTANCE, request, (r) -> Mono.just(legacyResponse));
StepVerifier.create(response).assertNext((r) -> {
assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
assertThat(r.headers().contentLength()).isEmpty();
StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.converted).verifyComplete();
}).verifyComplete();
}
@Test
void should_convert_json() {
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test").build();
ClientResponse legacyResponse = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
.body(Flux.just(this.original)).build();
Mono<ClientResponse> response = this.filter.filter(INSTANCE, request, (r) -> Mono.just(legacyResponse));
StepVerifier.create(response).assertNext((r) -> {
assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
assertThat(r.headers().contentLength()).isEmpty();
StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.converted).verifyComplete();
}).verifyComplete();
}
@Test
void should_not_convert_v2_actuator() {
InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
}));
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.attribute(ATTRIBUTE_ENDPOINT, "test").build();
ClientResponse response = ClientResponse.create(HttpStatus.OK)
.header(CONTENT_TYPE, ActuatorMediaType.V2_JSON)
.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
.body(Flux.just(this.original)).build();
Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));
StepVerifier.create(convertedResponse).assertNext((r) -> {
assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
}).verifyComplete();
}
@Test
void should_not_convert_unknown_endpoint() {
InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
}));
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test")).build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
.body(Flux.just(this.original)).build();
Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));
StepVerifier.create(convertedResponse).assertNext((r) -> {
assertThat(r.headers().contentType()).hasValue(MediaType.APPLICATION_JSON);
assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
}).verifyComplete();
}
@Test
void should_not_convert_without_converter() {
InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
}));
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/unknown"))
.attribute(ATTRIBUTE_ENDPOINT, "unknown").build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
.body(Flux.just(this.original)).build();
Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));
StepVerifier.create(convertedResponse).assertNext((r) -> {
assertThat(r.headers().contentType()).hasValue(MediaType.APPLICATION_JSON);
assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
}).verifyComplete();
}
private static Mono<WebClientResponseException> createResponseException(
ClientResponse response, HttpRequest request) {
return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
Charset charset = response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1);
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
return WebClientResponseException.create(
response.statusCode().value(),
response.statusCode().getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
charset,
request);
}
else {
return new UnknownHttpStatusCodeException(
response.rawStatusCode(),
response.headers().asHttpHeaders(),
bodyBytes,
charset,
request);
}
});
}
/**
* Consume up to the specified number of bytes from the response body and
* cancel if any more data arrives.
* <p>Internally delegates to {@link DataBufferUtils#takeUntilByteCount}.
* @param maxByteCount the limit as number of bytes
* @return the filter to limit the response size with
* @since 5.1
*/
public static ExchangeFilterFunction limitResponseSize(long maxByteCount) {
return (request, next) ->
next.exchange(request).map(response -> {
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers());
body = DataBufferUtils.takeUntilByteCount(body, maxByteCount);
return ClientResponse.from(response).body(body).build();
});
}
@Test
public void bodyExtractor() {
Mono<String> result = Mono.just("foo");
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
given(mockResponse.body(extractor)).willReturn(result);
assertSame(result, wrapper.body(extractor));
}
@Test
public void bodyExtractor() {
Mono<String> result = Mono.just("foo");
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
given(mockRequest.body(extractor)).willReturn(result);
assertSame(result, wrapper.body(extractor));
}
/**
* 所以端点的请求代理入口:/instances/{instanceId}/actuator/**
* @author xujin
* @param instanceId
* @param servletRequest
* @param servletResponse
* @return
* @throws IOException
*/
@ResponseBody
@RequestMapping(path = REQUEST_MAPPING_PATH, method = {RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST, RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS})
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId,
HttpServletRequest servletRequest,
HttpServletResponse servletResponse) throws IOException {
ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);
String pathWithinApplication = UriComponentsBuilder.fromPath(servletRequest.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE)
.toString()).toUriString();
String endpointLocalPath = getEndpointLocalPath(pathWithinApplication);
URI uri = UriComponentsBuilder.fromPath(endpointLocalPath)
.query(request.getURI().getRawQuery())
.build(true)
.toUri();
//We need to explicitly block until the headers are recieved and write them before the async dispatch.
//otherwise the FrameworkServlet will add wrong Allow header for OPTIONS request
ClientResponse clientResponse = super.forward(instanceId,
uri,
request.getMethod(),
request.getHeaders(),
() -> BodyInserters.fromDataBuffers(DataBufferUtils.readInputStream(request::getBody,
this.bufferFactory,
4096
))
).block();
ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
response.setStatusCode(clientResponse.statusCode());
response.getHeaders().addAll(filterHeaders(clientResponse.headers().asHttpHeaders()));
OutputStream responseBody = response.getBody();
response.flush();
return clientResponse.body(BodyExtractors.toDataBuffers())
.window(1)
.concatMap(body -> writeAndFlush(body, responseBody))
.then();
}
@Bean
@RouterOperation(beanClass = EmployeeRepository.class, beanMethod = "updateEmployee")
RouterFunction<ServerResponse> updateEmployeeRoute() {
return route(POST("/employees/update").and(accept(MediaType.APPLICATION_XML)),
req -> req.body(BodyExtractors.toMono(Employee.class))
.doOnNext(employeeRepository()::updateEmployee)
.then(ok().build()));
}
private static Mono<WebClientResponseException> createResponseException(
ClientResponse response, HttpRequest request) {
return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
Charset charset = response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1);
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
return WebClientResponseException.create(
response.statusCode().value(),
response.statusCode().getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
charset,
request);
}
else {
return new UnknownHttpStatusCodeException(
response.rawStatusCode(),
response.headers().asHttpHeaders(),
bodyBytes,
charset,
request);
}
});
}
/**
* Consume up to the specified number of bytes from the response body and
* cancel if any more data arrives.
* <p>Internally delegates to {@link DataBufferUtils#takeUntilByteCount}.
* @param maxByteCount the limit as number of bytes
* @return the filter to limit the response size with
* @since 5.1
*/
public static ExchangeFilterFunction limitResponseSize(long maxByteCount) {
return (request, next) ->
next.exchange(request).map(response -> {
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers());
body = DataBufferUtils.takeUntilByteCount(body, maxByteCount);
return ClientResponse.from(response).body(body).build();
});
}
@Test
public void bodyExtractor() {
Mono<String> result = Mono.just("foo");
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
when(mockResponse.body(extractor)).thenReturn(result);
assertSame(result, wrapper.body(extractor));
}
@Test
public void bodyExtractor() {
Mono<String> result = Mono.just("foo");
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
when(mockRequest.body(extractor)).thenReturn(result);
assertSame(result, wrapper.body(extractor));
}
public Mono<ServerResponse> handle(ServerRequest request) {
if (refuseRequestService.isRefuseRequest()) {
return error("lookout gateway is in refuse requests mode");
}
counter.inc();
RawMetric rm = initRawMetric(request);
// 目前获取客户端IP可以使用如下的方式 关键是要获取到 ServerHttpRequest 对象
// extractor 可否重用?
BodyExtractor<Mono<byte[]>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(byte[].class);
return request.body((inputMessage, context) -> {
RawMetricHead head = rm.getHead();
head.setClientIp(inputMessage.getRemoteAddress().getAddress().getHostAddress());
return extractor.extract(inputMessage, context);
}).flatMap(body -> {
size.record(body.length);
rm.setRawBody(body);
return doHandle(request, rm);
}).onErrorResume(error -> {
if (error instanceof FilterException) {
FilterException fe = (FilterException) error;
return error(fe.getResult().getMsg());
} else {
return error(error.getMessage());
}
});
}
static WebClientResponseException createResponseException(ClientResponse response) {
return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
Charset charset = response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1);
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
return WebClientResponseException.create(
response.statusCode().value(),
response.statusCode().getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
charset);
}
else {
return new UnknownHttpStatusCodeException(
response.rawStatusCode(),
response.headers().asHttpHeaders(),
bodyBytes,
charset);
}
})
.block();
}
@ResponseBody
@RequestMapping(path = INSTANCE_MAPPED_PATH, method = { RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST,
RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS })
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId, HttpServletRequest servletRequest,
HttpServletResponse servletResponse) throws IOException {
ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);
String endpointLocalPath = this.getEndpointLocalPath(this.adminContextPath + INSTANCE_MAPPED_PATH,
servletRequest);
URI uri = UriComponentsBuilder.fromPath(endpointLocalPath).query(request.getURI().getRawQuery()).build(true)
.toUri();
// We need to explicitly block until the headers are recieved and write them
// before the async dispatch.
// otherwise the FrameworkServlet will add wrong Allow header for OPTIONS request
Flux<DataBuffer> requestBody = DataBufferUtils.readInputStream(request::getBody, this.bufferFactory, 4096);
ClientResponse clientResponse = this.instanceWebProxy
.forward(this.registry.getInstance(InstanceId.of(instanceId)), uri, request.getMethod(),
this.httpHeadersFilter.filterHeaders(request.getHeaders()),
BodyInserters.fromDataBuffers(requestBody))
.block();
ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
response.setStatusCode(clientResponse.statusCode());
response.getHeaders().addAll(this.httpHeadersFilter.filterHeaders(clientResponse.headers().asHttpHeaders()));
OutputStream responseBody = response.getBody();
response.flush();
return clientResponse.body(BodyExtractors.toDataBuffers()).window(1)
.concatMap((body) -> writeAndFlush(body, responseBody)).then();
}
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
return body(BodyExtractors.toMono(elementClass));
}
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return body(BodyExtractors.toMono(typeReference));
}