下面列出了怎么用org.springframework.http.client.reactive.ClientHttpResponse的API类实例代码及写法,或者点击链接到github查看源代码。
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay there is no content type, if there is no content..
result = message.getBody().map(buffer -> {
DataBufferUtils.release(buffer);
throw ex;
});
}
else {
result = message instanceof ClientHttpResponse ?
consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
}
return result;
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
AtomicReference<WiretapClientHttpRequest> requestRef = new AtomicReference<>();
return this.delegate
.connect(method, uri, request -> {
WiretapClientHttpRequest wrapped = new WiretapClientHttpRequest(request);
requestRef.set(wrapped);
return requestCallback.apply(wrapped);
})
.map(response -> {
WiretapClientHttpRequest wrappedRequest = requestRef.get();
String header = WebTestClient.WEBTESTCLIENT_REQUEST_ID;
String requestId = wrappedRequest.getHeaders().getFirst(header);
Assert.state(requestId != null, () -> "No \"" + header + "\" header");
WiretapClientHttpResponse wrappedResponse = new WiretapClientHttpResponse(response);
this.exchanges.put(requestId, new Info(wrappedRequest, wrappedResponse));
return wrappedResponse;
});
}
/**
* Create an instance with an HTTP request and response along with promises
* for the serialized request and response body content.
*
* @param request the HTTP request
* @param response the HTTP response
* @param requestBody capture of serialized request body content
* @param responseBody capture of serialized response body content
* @param timeout how long to wait for content to materialize
* @param uriTemplate the URI template used to set up the request, if any
*/
ExchangeResult(ClientHttpRequest request, ClientHttpResponse response,
Mono<byte[]> requestBody, Mono<byte[]> responseBody, Duration timeout, @Nullable String uriTemplate) {
Assert.notNull(request, "ClientHttpRequest is required");
Assert.notNull(response, "ClientHttpResponse is required");
Assert.notNull(requestBody, "'requestBody' is required");
Assert.notNull(responseBody, "'responseBody' is required");
this.request = request;
this.response = response;
this.requestBody = requestBody;
this.responseBody = responseBody;
this.timeout = timeout;
this.uriTemplate = uriTemplate;
}
@Test
public void captureAndClaim() {
ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
WiretapConnector wiretapConnector = new WiretapConnector(connector);
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
function.exchange(clientRequest).block(ofMillis(0));
WiretapConnector.Info actual = wiretapConnector.claimRequest("1");
ExchangeResult result = actual.createExchangeResult(Duration.ZERO, null);
assertEquals(HttpMethod.GET, result.getMethod());
assertEquals("/test", result.getUrl().toString());
}
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
});
}
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay there is no content type, if there is no content..
result = message.getBody().map(buffer -> {
DataBufferUtils.release(buffer);
throw ex;
});
}
else {
result = message instanceof ClientHttpResponse ?
consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
}
return result;
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
AtomicReference<WiretapClientHttpRequest> requestRef = new AtomicReference<>();
return this.delegate
.connect(method, uri, request -> {
WiretapClientHttpRequest wrapped = new WiretapClientHttpRequest(request);
requestRef.set(wrapped);
return requestCallback.apply(wrapped);
})
.map(response -> {
WiretapClientHttpRequest wrappedRequest = requestRef.get();
String header = WebTestClient.WEBTESTCLIENT_REQUEST_ID;
String requestId = wrappedRequest.getHeaders().getFirst(header);
Assert.state(requestId != null, () -> "No \"" + header + "\" header");
WiretapClientHttpResponse wrappedResponse = new WiretapClientHttpResponse(response);
this.exchanges.put(requestId, new Info(wrappedRequest, wrappedResponse));
return wrappedResponse;
});
}
/**
* Create an instance with an HTTP request and response along with promises
* for the serialized request and response body content.
*
* @param request the HTTP request
* @param response the HTTP response
* @param requestBody capture of serialized request body content
* @param responseBody capture of serialized response body content
* @param timeout how long to wait for content to materialize
* @param uriTemplate the URI template used to set up the request, if any
*/
ExchangeResult(ClientHttpRequest request, ClientHttpResponse response,
Mono<byte[]> requestBody, Mono<byte[]> responseBody, Duration timeout, @Nullable String uriTemplate) {
Assert.notNull(request, "ClientHttpRequest is required");
Assert.notNull(response, "ClientHttpResponse is required");
Assert.notNull(requestBody, "'requestBody' is required");
Assert.notNull(responseBody, "'responseBody' is required");
this.request = request;
this.response = response;
this.requestBody = requestBody;
this.responseBody = responseBody;
this.timeout = timeout;
this.uriTemplate = uriTemplate;
}
@Test
public void captureAndClaim() {
ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
WiretapConnector wiretapConnector = new WiretapConnector(connector);
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
function.exchange(clientRequest).block(ofMillis(0));
WiretapConnector.Info actual = wiretapConnector.claimRequest("1");
ExchangeResult result = actual.createExchangeResult(Duration.ZERO, null);
assertEquals(HttpMethod.GET, result.getMethod());
assertEquals("/test", result.getUrl().toString());
}
@Override
public Mono<ClientHttpResponse> connect(org.springframework.http.HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
logger.debug("Connecting to '{}' with '{}", uri, method);
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
CompletableFuture<ClientHttpResponse> responseFuture = new CompletableFuture<>();
HttpClient client = vertx.createHttpClient(clientOptions);
HttpClientRequest request = client.requestAbs(HttpMethod.valueOf(method.name()), uri.toString())
.exceptionHandler(responseFuture::completeExceptionally)
.handler(response -> {
Flux<DataBuffer> responseBody = responseToFlux(response)
.doFinally(ignore -> client.close());
responseFuture.complete(new VertxClientHttpResponse(response, responseBody));
});
return requestCallback.apply(new VertxClientHttpRequest(request, bufferConverter))
.then(Mono.fromCompletionStage(responseFuture));
}
@Override
public Mono<ClientHttpResponse> connect(
HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
try {
requireNonNull(method, "method");
requireNonNull(uri, "uri");
requireNonNull(requestCallback, "requestCallback");
final ArmeriaClientHttpRequest request = createRequest(method, uri);
return requestCallback.apply(request)
.then(Mono.fromFuture(request.future()))
.map(ArmeriaHttpClientResponseSubscriber::new)
.flatMap(s -> Mono.fromFuture(s.headersFuture())
.map(headers -> createResponse(headers, s)));
} catch (NullPointerException | IllegalArgumentException e) {
return Mono.error(e);
}
}
private void logResponse(ClientHttpResponse response, String logPrefix) {
LogFormatUtils.traceDebug(logger, traceOn -> {
int code = response.getRawStatusCode();
HttpStatus status = HttpStatus.resolve(code);
return logPrefix + "Response " + (status != null ? status : code) +
(traceOn ? ", headers=" + formatHeaders(response.getHeaders()) : "");
});
}
public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies,
String logPrefix, String requestDescription) {
this.response = response;
this.strategies = strategies;
this.headers = new DefaultHeaders();
this.logPrefix = logPrefix;
this.requestDescription = requestDescription;
}
@SuppressWarnings("unchecked")
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
T result = extractor.extract(this.response, new BodyExtractor.Context() {
@Override
public List<HttpMessageReader<?>> messageReaders() {
return strategies.messageReaders();
}
@Override
public Optional<ServerHttpResponse> serverResponse() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return Hints.from(Hints.LOG_PREFIX_HINT, logPrefix);
}
});
String description = "Body from " + this.requestDescription + " [DefaultClientResponse]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
}
else if (result instanceof Flux) {
return (T) ((Flux<?>) result).checkpoint(description);
}
else {
return result;
}
}
@Override
public ClientResponse build() {
ClientHttpResponse httpResponse =
new BuiltClientHttpResponse(this.statusCode, this.headers, this.cookies, this.body);
// When building ClientResponse manually, the ClientRequest.logPrefix() has to be passed,
// e.g. via ClientResponse.Builder, but this (builder) is not used currently.
return new DefaultClientResponse(httpResponse, this.strategies, "", "");
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
MonoProcessor<ClientHttpResponse> result = MonoProcessor.create();
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
MockServerHttpResponse mockServerResponse = new MockServerHttpResponse();
mockClientRequest.setWriteHandler(requestBody -> {
log("Invoking HttpHandler for ", httpMethod, uri);
ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(aVoid -> {}, result::onError);
return Mono.empty();
});
mockServerResponse.setWriteHandler(responseBody ->
Mono.fromRunnable(() -> {
log("Creating client response for ", httpMethod, uri);
result.onNext(adaptResponse(mockServerResponse, responseBody));
}));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);
return result;
}
private ClientHttpResponse adaptResponse(MockServerHttpResponse response, Flux<DataBuffer> body) {
HttpStatus status = Optional.ofNullable(response.getStatusCode()).orElse(HttpStatus.OK);
MockClientHttpResponse clientResponse = new MockClientHttpResponse(status);
clientResponse.getHeaders().putAll(response.getHeaders());
clientResponse.getCookies().putAll(response.getCookies());
clientResponse.setBody(body);
return clientResponse;
}
private void logResponse(ClientHttpResponse response, String logPrefix) {
LogFormatUtils.traceDebug(logger, traceOn -> {
int code = response.getRawStatusCode();
HttpStatus status = HttpStatus.resolve(code);
return logPrefix + "Response " + (status != null ? status : code) +
(traceOn ? ", headers=" + formatHeaders(response.getHeaders()) : "");
});
}
@Override
public ClientResponse build() {
ClientHttpResponse httpResponse =
new BuiltClientHttpResponse(this.statusCode, this.headers, this.cookies, this.body);
// When building ClientResponse manually, the ClientRequest.logPrefix() has to be passed,
// e.g. via ClientResponse.Builder, but this (builder) is not used currently.
return new DefaultClientResponse(httpResponse, this.strategies, "");
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
MonoProcessor<ClientHttpResponse> result = MonoProcessor.create();
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
MockServerHttpResponse mockServerResponse = new MockServerHttpResponse();
mockClientRequest.setWriteHandler(requestBody -> {
log("Invoking HttpHandler for ", httpMethod, uri);
ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(aVoid -> {}, result::onError);
return Mono.empty();
});
mockServerResponse.setWriteHandler(responseBody ->
Mono.fromRunnable(() -> {
log("Creating client response for ", httpMethod, uri);
result.onNext(adaptResponse(mockServerResponse, responseBody));
}));
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);
return result;
}
private ClientHttpResponse adaptResponse(MockServerHttpResponse response, Flux<DataBuffer> body) {
HttpStatus status = Optional.ofNullable(response.getStatusCode()).orElse(HttpStatus.OK);
MockClientHttpResponse clientResponse = new MockClientHttpResponse(status);
clientResponse.getHeaders().putAll(response.getHeaders());
clientResponse.getCookies().putAll(response.getCookies());
clientResponse.setBody(body);
return clientResponse;
}
@Test
public void shouldNotConnectRelativeUri() {
VertxClientHttpConnector connector = new VertxClientHttpConnector(mockVertx);
Mono<ClientHttpResponse> result = connector.connect(GET, URI.create("/test"), r -> Mono.empty());
StepVerifier.create(result)
.verifyErrorMessage("URI is not absolute: /test");
}
@Override
protected void doInAfterTrace(SpanEventRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
recorder.recordApi(methodDescriptor);
recorder.recordException(throwable);
recorder.recordServiceType(SpringWebFluxConstants.SPRING_WEBFLUX);
if (args[0] instanceof ClientHttpResponse) {
ClientHttpResponse response = (ClientHttpResponse) args[0];
recorder.recordAttribute(AnnotationKey.HTTP_STATUS_CODE, response.getRawStatusCode());
}
}
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return this.delegate.body(extractor);
}
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty;
}
@SuppressWarnings("unchecked")
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty;
}
@Before
public void createMocks() {
mockResponse = mock(ClientHttpResponse.class);
mockExchangeStrategies = mock(ExchangeStrategies.class);
defaultClientResponse = new DefaultClientResponse(mockResponse, mockExchangeStrategies, "", "");
}
public WiretapClientHttpResponse(ClientHttpResponse delegate) {
super(delegate);
this.recorder = new WiretapRecorder(super.getBody(), null);
}
@Override
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return this.delegate.body(extractor);
}
public DefaultClientResponse(ClientHttpResponse response, ExchangeStrategies strategies, String logPrefix) {
this.response = response;
this.strategies = strategies;
this.headers = new DefaultHeaders();
this.logPrefix = logPrefix;
}