org.springframework.http.codec.multipart.FormFieldPart#org.springframework.web.reactive.function.BodyExtractors源码实例Demo

下面列出了org.springframework.http.codec.multipart.FormFieldPart#org.springframework.web.reactive.function.BodyExtractors 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void limitResponseSize() {
	DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
	DataBuffer b1 = dataBuffer("foo", bufferFactory);
	DataBuffer b2 = dataBuffer("bar", bufferFactory);
	DataBuffer b3 = dataBuffer("baz", bufferFactory);

	ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();

	Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
			.filter(request, req -> Mono.just(response));

	StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
			.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
			.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
			.expectComplete()
			.verify();

}
 
源代码2 项目: Moss   文件: InstancesProxyController.java
@RequestMapping(path = REQUEST_MAPPING_PATH, method = {RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST, RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS})
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId,
                                ServerHttpRequest request,
                                ServerHttpResponse response) {
    String endpointLocalPath = getEndpointLocalPath(request.getPath().pathWithinApplication().value());
    URI uri = UriComponentsBuilder.fromPath(endpointLocalPath)
                                  .query(request.getURI().getRawQuery())
                                  .build(true)
                                  .toUri();

    return super.forward(instanceId, uri, request.getMethod(), request.getHeaders(),
        () -> BodyInserters.fromDataBuffers(request.getBody())).flatMap(clientResponse -> {
        response.setStatusCode(clientResponse.statusCode());
        response.getHeaders().addAll(filterHeaders(clientResponse.headers().asHttpHeaders()));
        return response.writeAndFlushWith(clientResponse.body(BodyExtractors.toDataBuffers()).window(1));
    });
}
 
@Test
public void should_convert_v1_actuator() {
    ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());

    ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
                                         .attribute(ATTRIBUTE_ENDPOINT, "test")
                                         .build();
    ClientResponse response = ClientResponse.create(HttpStatus.OK)
                                            .header(CONTENT_TYPE, ActuatorMediaType.V1_JSON)
                                            .body(Flux.just(ORIGINAL))
                                            .build();

    Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));

    StepVerifier.create(convertedResponse)
                .assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
                                             .expectNext(CONVERTED)
                                             .verifyComplete())
                .verifyComplete();
}
 
@Test
public void should_convert_json() {
    ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());

    ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
                                         .attribute(ATTRIBUTE_ENDPOINT, "test")
                                         .build();
    ClientResponse response = ClientResponse.create(HttpStatus.OK)
                                            .header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
                                            .body(Flux.just(ORIGINAL))
                                            .build();

    Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));

    StepVerifier.create(convertedResponse)
                .assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
                                             .expectNext(CONVERTED)
                                             .verifyComplete())
                .verifyComplete();
}
 
@Test
public void should_not_convert_v2_actuator() {
    ExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoint(new TestLegacyEndpointConverter());

    ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
                                         .attribute(ATTRIBUTE_ENDPOINT, "test")
                                         .build();
    ClientResponse response = ClientResponse.create(HttpStatus.OK)
                                            .header(CONTENT_TYPE, ActuatorMediaType.V2_JSON)
                                            .body(Flux.just(ORIGINAL))
                                            .build();

    Mono<ClientResponse> convertedResponse = filter.filter(request, r -> Mono.just(response));

    StepVerifier.create(convertedResponse)
                .assertNext(r -> StepVerifier.create(r.body(BodyExtractors.toDataBuffers()))
                                             .expectNext(ORIGINAL)
                                             .verifyComplete())
                .verifyComplete();
}
 
@Bean
@RouterOperations({ @RouterOperation(path = "/employees-composed/update", beanClass = EmployeeRepository.class, beanMethod = "updateEmployee"),
		@RouterOperation(path = "/employees-composed/{id}", beanClass = EmployeeRepository.class, beanMethod = "findEmployeeById"),
		@RouterOperation(path = "/employees-composed", beanClass = EmployeeRepository.class, beanMethod = "findAllEmployees") })
RouterFunction<ServerResponse> composedRoutes() {
	return
			route(GET("/employees-composed"),
					req -> ok().body(
							employeeRepository().findAllEmployees(), Employee.class))
					.and(route(GET("/employees-composed/{id}"),
							req -> ok().body(
									employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)))
					.and(route(POST("/employees-composed/update"),
							req -> req.body(BodyExtractors.toMono(Employee.class))
									.doOnNext(employeeRepository()::updateEmployee)
									.then(ok().build())));
}
 
@Test
public void limitResponseSize() {
	DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
	DataBuffer b1 = dataBuffer("foo", bufferFactory);
	DataBuffer b2 = dataBuffer("bar", bufferFactory);
	DataBuffer b3 = dataBuffer("baz", bufferFactory);

	ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();

	Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
			.filter(request, req -> Mono.just(response));

	StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
			.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
			.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
			.expectComplete()
			.verify();

}
 
源代码8 项目: bird-java   文件: ResultPipe.java
@Override
protected Mono<Void> doExecute(ServerWebExchange exchange, PipeChain chain, RouteDefinition routeDefinition) {
    return chain.execute(exchange).then(Mono.defer(() -> {
        ServerHttpResponse response = exchange.getResponse();
        final String resultType = exchange.getAttribute(GatewayConstant.RESPONSE_RESULT_TYPE);
        RpcTypeEnum rpcTypeEnum = RpcTypeEnum.acquireByName(routeDefinition.getRpcType());

        if(RpcTypeEnum.SPRING_CLOUD.getName().equals(rpcTypeEnum.getName())){
            ClientResponse clientResponse = exchange.getAttribute(GatewayConstant.HTTP_RPC_RESULT);
            if (Objects.isNull(clientResponse)) {
                return jsonResult(exchange,JsonResult.error("服务端无响应."));
            }
            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));

        }else if(RpcTypeEnum.DUBBO.getName().equals(rpcTypeEnum.getName())){
            if(StringUtils.equalsIgnoreCase(resultType,ResultEnum.SUCCESS.getName())){
                final Object result = exchange.getAttribute(GatewayConstant.DUBBO_RPC_RESULT);
                return jsonResult(exchange,JsonResult.success(result));
            }else {
                final String message = exchange.getAttribute(GatewayConstant.DUBBO_ERROR_MESSAGE);
                return jsonResult(exchange,JsonResult.error(message));
            }
        }
        return Mono.empty();
    }));
}
 
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
	// until the WebHandler is run
	return chain.filter(exchange).doOnError(throwable -> cleanup(exchange))
			.then(Mono.defer(() -> {
				ClientResponse clientResponse = exchange
						.getAttribute(CLIENT_RESPONSE_ATTR);
				if (clientResponse == null) {
					return Mono.empty();
				}
				log.trace("WebClientWriteResponseFilter start");
				ServerHttpResponse response = exchange.getResponse();

				return response
						.writeWith(
								clientResponse.body(BodyExtractors.toDataBuffers()))
						// .log("webClient response")
						.doOnCancel(() -> cleanup(exchange));
			}));
}
 
@RequestMapping(path = INSTANCE_MAPPED_PATH, method = { RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST,
		RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS })
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId, ServerHttpRequest request,
		ServerHttpResponse response) {
	String endpointLocalPath = this.getEndpointLocalPath(this.adminContextPath + INSTANCE_MAPPED_PATH, request);
	URI uri = UriComponentsBuilder.fromPath(endpointLocalPath).query(request.getURI().getRawQuery()).build(true)
			.toUri();

	return this.instanceWebProxy.forward(this.registry.getInstance(InstanceId.of(instanceId)), uri,
			request.getMethod(), this.httpHeadersFilter.filterHeaders(request.getHeaders()),
			BodyInserters.fromDataBuffers(request.getBody())).flatMap((clientResponse) -> {
				response.setStatusCode(clientResponse.statusCode());
				response.getHeaders()
						.addAll(this.httpHeadersFilter.filterHeaders(clientResponse.headers().asHttpHeaders()));
				return response.writeAndFlushWith(clientResponse.body(BodyExtractors.toDataBuffers()).window(1));
			});
}
 
@Test
void should_convert_v1_actuator() {
	ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
			.attribute(ATTRIBUTE_ENDPOINT, "test").build();
	@SuppressWarnings("deprecation")
	ClientResponse legacyResponse = ClientResponse.create(HttpStatus.OK)
			.header(CONTENT_TYPE, ACTUATOR_V1_MEDIATYPE.toString())
			.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
			.body(Flux.just(this.original)).build();

	Mono<ClientResponse> response = this.filter.filter(INSTANCE, request, (r) -> Mono.just(legacyResponse));

	StepVerifier.create(response).assertNext((r) -> {
		assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
		assertThat(r.headers().contentLength()).isEmpty();
		StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.converted).verifyComplete();
	}).verifyComplete();
}
 
@Test
void should_convert_json() {
	ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
			.attribute(ATTRIBUTE_ENDPOINT, "test").build();
	ClientResponse legacyResponse = ClientResponse.create(HttpStatus.OK)
			.header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
			.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
			.body(Flux.just(this.original)).build();

	Mono<ClientResponse> response = this.filter.filter(INSTANCE, request, (r) -> Mono.just(legacyResponse));

	StepVerifier.create(response).assertNext((r) -> {
		assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
		assertThat(r.headers().contentLength()).isEmpty();
		StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.converted).verifyComplete();
	}).verifyComplete();
}
 
@Test
void should_not_convert_v2_actuator() {
	InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
			singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
			}));

	ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
			.attribute(ATTRIBUTE_ENDPOINT, "test").build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK)
			.header(CONTENT_TYPE, ActuatorMediaType.V2_JSON)
			.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
			.body(Flux.just(this.original)).build();

	Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));

	StepVerifier.create(convertedResponse).assertNext((r) -> {
		assertThat(r.headers().contentType()).hasValue(MediaType.valueOf(ActuatorMediaType.V2_JSON));
		assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
		StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
	}).verifyComplete();
}
 
@Test
void should_not_convert_unknown_endpoint() {
	InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
			singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
			}));

	ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/test")).build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK).header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
			.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
			.body(Flux.just(this.original)).build();

	Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));

	StepVerifier.create(convertedResponse).assertNext((r) -> {
		assertThat(r.headers().contentType()).hasValue(MediaType.APPLICATION_JSON);
		assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
		StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
	}).verifyComplete();
}
 
@Test
void should_not_convert_without_converter() {
	InstanceExchangeFilterFunction filter = InstanceExchangeFilterFunctions.convertLegacyEndpoints(
			singletonList(new LegacyEndpointConverter("test", (from) -> Flux.just(this.converted)) {
			}));

	ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/unknown"))
			.attribute(ATTRIBUTE_ENDPOINT, "unknown").build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK).header(CONTENT_TYPE, APPLICATION_JSON_VALUE)
			.header(CONTENT_LENGTH, Integer.toString(this.original.readableByteCount()))
			.body(Flux.just(this.original)).build();

	Mono<ClientResponse> convertedResponse = filter.filter(INSTANCE, request, (r) -> Mono.just(response));

	StepVerifier.create(convertedResponse).assertNext((r) -> {
		assertThat(r.headers().contentType()).hasValue(MediaType.APPLICATION_JSON);
		assertThat(r.headers().contentLength()).hasValue(this.original.readableByteCount());
		StepVerifier.create(r.body(BodyExtractors.toDataBuffers())).expectNext(this.original).verifyComplete();
	}).verifyComplete();
}
 
源代码16 项目: spring-analysis-note   文件: DefaultWebClient.java
private static Mono<WebClientResponseException> createResponseException(
		ClientResponse response, HttpRequest request) {

	return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
			.map(dataBuffer -> {
				byte[] bytes = new byte[dataBuffer.readableByteCount()];
				dataBuffer.read(bytes);
				DataBufferUtils.release(dataBuffer);
				return bytes;
			})
			.defaultIfEmpty(new byte[0])
			.map(bodyBytes -> {
				Charset charset = response.headers().contentType()
						.map(MimeType::getCharset)
						.orElse(StandardCharsets.ISO_8859_1);
				if (HttpStatus.resolve(response.rawStatusCode()) != null) {
					return WebClientResponseException.create(
							response.statusCode().value(),
							response.statusCode().getReasonPhrase(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset,
							request);
				}
				else {
					return new UnknownHttpStatusCodeException(
							response.rawStatusCode(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset,
							request);
				}
			});
}
 
/**
 * Consume up to the specified number of bytes from the response body and
 * cancel if any more data arrives.
 * <p>Internally delegates to {@link DataBufferUtils#takeUntilByteCount}.
 * @param maxByteCount the limit as number of bytes
 * @return the filter to limit the response size with
 * @since 5.1
 */
public static ExchangeFilterFunction limitResponseSize(long maxByteCount) {
	return (request, next) ->
			next.exchange(request).map(response -> {
				Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers());
				body = DataBufferUtils.takeUntilByteCount(body, maxByteCount);
				return ClientResponse.from(response).body(body).build();
			});
}
 
@Test
public void bodyExtractor() {
	Mono<String> result = Mono.just("foo");
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
	given(mockResponse.body(extractor)).willReturn(result);

	assertSame(result, wrapper.body(extractor));
}
 
@Test
public void bodyExtractor() {
	Mono<String> result = Mono.just("foo");
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
	given(mockRequest.body(extractor)).willReturn(result);

	assertSame(result, wrapper.body(extractor));
}
 
源代码20 项目: Moss   文件: InstancesProxyController.java
/**
 * 所以端点的请求代理入口:/instances/{instanceId}/actuator/**
 * @author xujin
 * @param instanceId
 * @param servletRequest
 * @param servletResponse
 * @return
 * @throws IOException
 */
@ResponseBody
@RequestMapping(path = REQUEST_MAPPING_PATH, method = {RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST, RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS})
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId,
                                HttpServletRequest servletRequest,
                                HttpServletResponse servletResponse) throws IOException {
    ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);

    String pathWithinApplication = UriComponentsBuilder.fromPath(servletRequest.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE)
                                                                               .toString()).toUriString();
    String endpointLocalPath = getEndpointLocalPath(pathWithinApplication);

    URI uri = UriComponentsBuilder.fromPath(endpointLocalPath)
                                  .query(request.getURI().getRawQuery())
                                  .build(true)
                                  .toUri();

    //We need to explicitly block until the headers are recieved and write them before the async dispatch.
    //otherwise the FrameworkServlet will add wrong Allow header for OPTIONS request
    ClientResponse clientResponse = super.forward(instanceId,
        uri,
        request.getMethod(),
        request.getHeaders(),
        () -> BodyInserters.fromDataBuffers(DataBufferUtils.readInputStream(request::getBody,
            this.bufferFactory,
            4096
        ))
    ).block();

    ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
    response.setStatusCode(clientResponse.statusCode());
    response.getHeaders().addAll(filterHeaders(clientResponse.headers().asHttpHeaders()));
    OutputStream responseBody = response.getBody();
    response.flush();

    return clientResponse.body(BodyExtractors.toDataBuffers())
                         .window(1)
                         .concatMap(body -> writeAndFlush(body, responseBody))
                         .then();
}
 
@Bean
@RouterOperation(beanClass = EmployeeRepository.class, beanMethod = "updateEmployee")
RouterFunction<ServerResponse> updateEmployeeRoute() {
	return route(POST("/employees/update").and(accept(MediaType.APPLICATION_XML)),
			req -> req.body(BodyExtractors.toMono(Employee.class))
					.doOnNext(employeeRepository()::updateEmployee)
					.then(ok().build()));
}
 
源代码22 项目: java-technology-stack   文件: DefaultWebClient.java
private static Mono<WebClientResponseException> createResponseException(
		ClientResponse response, HttpRequest request) {

	return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
			.map(dataBuffer -> {
				byte[] bytes = new byte[dataBuffer.readableByteCount()];
				dataBuffer.read(bytes);
				DataBufferUtils.release(dataBuffer);
				return bytes;
			})
			.defaultIfEmpty(new byte[0])
			.map(bodyBytes -> {
				Charset charset = response.headers().contentType()
						.map(MimeType::getCharset)
						.orElse(StandardCharsets.ISO_8859_1);
				if (HttpStatus.resolve(response.rawStatusCode()) != null) {
					return WebClientResponseException.create(
							response.statusCode().value(),
							response.statusCode().getReasonPhrase(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset,
							request);
				}
				else {
					return new UnknownHttpStatusCodeException(
							response.rawStatusCode(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset,
							request);
				}
			});
}
 
/**
 * Consume up to the specified number of bytes from the response body and
 * cancel if any more data arrives.
 * <p>Internally delegates to {@link DataBufferUtils#takeUntilByteCount}.
 * @param maxByteCount the limit as number of bytes
 * @return the filter to limit the response size with
 * @since 5.1
 */
public static ExchangeFilterFunction limitResponseSize(long maxByteCount) {
	return (request, next) ->
			next.exchange(request).map(response -> {
				Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers());
				body = DataBufferUtils.takeUntilByteCount(body, maxByteCount);
				return ClientResponse.from(response).body(body).build();
			});
}
 
@Test
public void bodyExtractor() {
	Mono<String> result = Mono.just("foo");
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
	when(mockResponse.body(extractor)).thenReturn(result);

	assertSame(result, wrapper.body(extractor));
}
 
@Test
public void bodyExtractor() {
	Mono<String> result = Mono.just("foo");
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
	when(mockRequest.body(extractor)).thenReturn(result);

	assertSame(result, wrapper.body(extractor));
}
 
源代码26 项目: sofa-lookout   文件: AbstractWebfluxImporter.java
public Mono<ServerResponse> handle(ServerRequest request) {
    if (refuseRequestService.isRefuseRequest()) {
        return error("lookout gateway is in refuse requests mode");
    }
    counter.inc();

    RawMetric rm = initRawMetric(request);

    // 目前获取客户端IP可以使用如下的方式 关键是要获取到 ServerHttpRequest 对象
    // extractor 可否重用?
    BodyExtractor<Mono<byte[]>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(byte[].class);
    return request.body((inputMessage, context) -> {
        RawMetricHead head = rm.getHead();
        head.setClientIp(inputMessage.getRemoteAddress().getAddress().getHostAddress());
        return extractor.extract(inputMessage, context);
    }).flatMap(body -> {
        size.record(body.length);
        rm.setRawBody(body);
        return doHandle(request, rm);
    }).onErrorResume(error -> {
        if (error instanceof FilterException) {
            FilterException fe = (FilterException) error;
            return error(fe.getResult().getMsg());
        } else {
            return error(error.getMessage());
        }
    });
}
 
static WebClientResponseException createResponseException(ClientResponse response) {
	return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
			.map(dataBuffer -> {
				byte[] bytes = new byte[dataBuffer.readableByteCount()];
				dataBuffer.read(bytes);
				DataBufferUtils.release(dataBuffer);
				return bytes;
			})
			.defaultIfEmpty(new byte[0])
			.map(bodyBytes -> {
				Charset charset = response.headers().contentType()
						.map(MimeType::getCharset)
						.orElse(StandardCharsets.ISO_8859_1);
				if (HttpStatus.resolve(response.rawStatusCode()) != null) {
					return WebClientResponseException.create(
							response.statusCode().value(),
							response.statusCode().getReasonPhrase(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset);
				}
				else {
					return new UnknownHttpStatusCodeException(
							response.rawStatusCode(),
							response.headers().asHttpHeaders(),
							bodyBytes,
							charset);
				}
			})
			.block();
}
 
@ResponseBody
@RequestMapping(path = INSTANCE_MAPPED_PATH, method = { RequestMethod.GET, RequestMethod.HEAD, RequestMethod.POST,
		RequestMethod.PUT, RequestMethod.PATCH, RequestMethod.DELETE, RequestMethod.OPTIONS })
public Mono<Void> endpointProxy(@PathVariable("instanceId") String instanceId, HttpServletRequest servletRequest,
		HttpServletResponse servletResponse) throws IOException {
	ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);
	String endpointLocalPath = this.getEndpointLocalPath(this.adminContextPath + INSTANCE_MAPPED_PATH,
			servletRequest);
	URI uri = UriComponentsBuilder.fromPath(endpointLocalPath).query(request.getURI().getRawQuery()).build(true)
			.toUri();

	// We need to explicitly block until the headers are recieved and write them
	// before the async dispatch.
	// otherwise the FrameworkServlet will add wrong Allow header for OPTIONS request
	Flux<DataBuffer> requestBody = DataBufferUtils.readInputStream(request::getBody, this.bufferFactory, 4096);
	ClientResponse clientResponse = this.instanceWebProxy
			.forward(this.registry.getInstance(InstanceId.of(instanceId)), uri, request.getMethod(),
					this.httpHeadersFilter.filterHeaders(request.getHeaders()),
					BodyInserters.fromDataBuffers(requestBody))
			.block();

	ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
	response.setStatusCode(clientResponse.statusCode());
	response.getHeaders().addAll(this.httpHeadersFilter.filterHeaders(clientResponse.headers().asHttpHeaders()));
	OutputStream responseBody = response.getBody();
	response.flush();

	return clientResponse.body(BodyExtractors.toDataBuffers()).window(1)
			.concatMap((body) -> writeAndFlush(body, responseBody)).then();
}
 
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
	return body(BodyExtractors.toMono(elementClass));
}
 
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
	return body(BodyExtractors.toMono(typeReference));
}