下面列出了怎么用org.springframework.http.ReactiveHttpInputMessage的API类实例代码及写法,或者点击链接到github查看源代码。
private static <T, S extends Publisher<T>> S readWithMessageReaders(
ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
Function<HttpMessageReader<T>, S> readerFunction,
Function<UnsupportedMediaTypeException, S> errorFunction,
Supplier<S> emptySupplier) {
if (VOID_TYPE.equals(elementType)) {
return emptySupplier.get();
}
MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
.orElse(MediaType.APPLICATION_OCTET_STREAM);
return context.messageReaders().stream()
.filter(reader -> reader.canRead(elementType, contentType))
.findFirst()
.map(BodyExtractors::<T>cast)
.map(readerFunction)
.orElseGet(() -> {
List<MediaType> mediaTypes = context.messageReaders().stream()
.flatMap(reader -> reader.getReadableMediaTypes().stream())
.collect(Collectors.toList());
return errorFunction.apply(
new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
});
}
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay there is no content type, if there is no content..
result = message.getBody().map(buffer -> {
DataBufferUtils.release(buffer);
throw ex;
});
}
else {
result = message instanceof ClientHttpResponse ?
consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
}
return result;
}
@Test
public void toMono() {
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Mono<String> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext("foo")
.expectComplete()
.verify();
}
@Test
public void toMonoParameterizedTypeReference() {
BodyExtractor<Mono<Map<String, String>>, ReactiveHttpInputMessage> extractor =
BodyExtractors.toMono(new ParameterizedTypeReference<Map<String, String>>() {});
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").contentType(MediaType.APPLICATION_JSON).body(body);
Mono<Map<String, String>> result = extractor.extract(request, this.context);
Map<String, String > expected = new LinkedHashMap<>();
expected.put("username", "foo");
expected.put("password", "bar");
StepVerifier.create(result)
.expectNext(expected)
.expectComplete()
.verify();
}
@Test
public void toMonoWithHints() {
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.contentType(MediaType.APPLICATION_JSON)
.body(body);
Mono<User> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.consumeNextWith(user -> {
assertEquals("foo", user.getUsername());
assertNull(user.getPassword());
})
.expectComplete()
.verify();
}
@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(body.flux());
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(dataBuffer);
})
.verifyComplete();
body.assertCancelled();
}
@Test
public void toFlux() {
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Flux<String> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext("foo")
.expectComplete()
.verify();
}
@Test
public void toDataBuffers() {
BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers();
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Flux<DataBuffer> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
@Override
public Mono<MultiValueMap<String, String>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage message, Map<String, Object> hints) {
MediaType contentType = message.getHeaders().getContentType();
Charset charset = getMediaTypeCharset(contentType);
return DataBufferUtils.join(message.getBody())
.map(buffer -> {
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
String body = charBuffer.toString();
DataBufferUtils.release(buffer);
MultiValueMap<String, String> formData = parseFormData(charset, body);
logFormData(formData, hints);
return formData;
});
}
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);
return this.partReader.read(elementType, inputMessage, allHints)
.collectMultimap(Part::name)
.doOnNext(map ->
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"))
)
.map(this::toMultiValueMap);
}
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
byte[] boundary = boundary(message);
if (boundary == null) {
return Flux.error(new CodecException("No multipart boundary found in Content-Type: \"" +
message.getHeaders().getContentType() + "\""));
}
if (logger.isTraceEnabled()) {
logger.trace("Boundary: " + toString(boundary));
}
byte[] boundaryNeedle = concat(BOUNDARY_PREFIX, boundary);
Flux<DataBuffer> body = skipUntilFirstBoundary(message.getBody(), boundary);
return DataBufferUtils.split(body, boundaryNeedle)
.takeWhile(DefaultMultipartMessageReader::notLastBoundary)
.map(DefaultMultipartMessageReader::toPart)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.doOnDiscard(DefaultPart.class, part -> DataBufferUtils.release(part.body));
}
private static <T, S extends Publisher<T>> S readWithMessageReaders(
ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
Function<HttpMessageReader<T>, S> readerFunction,
Function<UnsupportedMediaTypeException, S> errorFunction,
Supplier<S> emptySupplier) {
if (VOID_TYPE.equals(elementType)) {
return emptySupplier.get();
}
MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
.orElse(MediaType.APPLICATION_OCTET_STREAM);
return context.messageReaders().stream()
.filter(reader -> reader.canRead(elementType, contentType))
.findFirst()
.map(BodyExtractors::<T>cast)
.map(readerFunction)
.orElseGet(() -> {
List<MediaType> mediaTypes = context.messageReaders().stream()
.flatMap(reader -> reader.getReadableMediaTypes().stream())
.collect(Collectors.toList());
return errorFunction.apply(
new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
});
}
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay there is no content type, if there is no content..
result = message.getBody().map(buffer -> {
DataBufferUtils.release(buffer);
throw ex;
});
}
else {
result = message instanceof ClientHttpResponse ?
consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
}
return result;
}
@Test
public void toMono() {
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Mono<String> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext("foo")
.expectComplete()
.verify();
}
@Test
public void toMonoParameterizedTypeReference() {
BodyExtractor<Mono<Map<String, String>>, ReactiveHttpInputMessage> extractor =
BodyExtractors.toMono(new ParameterizedTypeReference<Map<String, String>>() {});
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").contentType(MediaType.APPLICATION_JSON).body(body);
Mono<Map<String, String>> result = extractor.extract(request, this.context);
Map<String, String > expected = new LinkedHashMap<>();
expected.put("username", "foo");
expected.put("password", "bar");
StepVerifier.create(result)
.expectNext(expected)
.expectComplete()
.verify();
}
@Test
public void toMonoWithHints() {
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.contentType(MediaType.APPLICATION_JSON)
.body(body);
Mono<User> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.consumeNextWith(user -> {
assertEquals("foo", user.getUsername());
assertNull(user.getPassword());
})
.expectComplete()
.verify();
}
@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(body.flux());
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(dataBuffer);
})
.verifyComplete();
body.assertCancelled();
}
@Test
public void toFlux() {
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Flux<String> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext("foo")
.expectComplete()
.verify();
}
@Test
public void toDataBuffers() {
BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers();
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Flux<DataBuffer> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
@Override
public Mono<MultiValueMap<String, String>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage message, Map<String, Object> hints) {
MediaType contentType = message.getHeaders().getContentType();
Charset charset = getMediaTypeCharset(contentType);
return DataBufferUtils.join(message.getBody())
.map(buffer -> {
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
String body = charBuffer.toString();
DataBufferUtils.release(buffer);
MultiValueMap<String, String> formData = parseFormData(charset, body);
logFormData(formData, hints);
return formData;
});
}
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {
Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);
return this.partReader.read(elementType, inputMessage, allHints)
.collectMultimap(Part::name)
.doOnNext(map -> {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
})
.map(this::toMultiValueMap);
}
private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
return (inputMessage, context) ->
readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
skipBodyAsMono(inputMessage));
}
@SuppressWarnings("unchecked")
private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
return (inputMessage, context) ->
readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader),
ex -> unsupportedErrorHandler(inputMessage, ex),
skipBodyAsFlux(inputMessage));
}
/**
* Extractor to read form data into {@code MultiValueMap<String, String>}.
* <p>As of 5.1 this method can also be used on the client side to read form
* data from a server response (e.g. OAuth).
* @return {@code BodyExtractor} for form data
*/
public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() {
return (message, context) -> {
ResolvableType elementType = FORM_DATA_TYPE;
MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED;
HttpMessageReader<MultiValueMap<String, String>> reader = findReader(elementType, mediaType, context);
return readToMono(message, context, elementType, reader);
};
}
private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context,
ResolvableType type, HttpMessageReader<T> reader) {
return context.serverResponse()
.map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints()))
.orElseGet(() -> reader.readMono(type, message, context.hints()));
}
private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context,
ResolvableType type, HttpMessageReader<T> reader) {
return context.serverResponse()
.map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints()))
.orElseGet(() -> reader.read(type, message, context.hints()));
}
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
return message.getBody()
.map(buffer -> {
DataBufferUtils.release(buffer);
throw new ReadCancellationException();
})
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
.then();
}
@Override
public Mono<Object> readMono(
ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
// We're ahead of String + "*/*"
// Let's see if we can aggregate the output (lest we time out)...
if (elementType.resolve() == String.class) {
Flux<DataBuffer> body = message.getBody();
return stringDecoder.decodeToMono(body, elementType, null, null).cast(Object.class);
}
return Mono.error(new UnsupportedOperationException(
"ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
}
@Test
public void toMonoVoidAsClientWithEmptyBody() {
TestPublisher<DataBuffer> body = TestPublisher.create();
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(body.flux());
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.complete();
})
.verifyComplete();
}
@Test
public void toFluxWithHints() {
BodyExtractor<Flux<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(User.class);
this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
String text = "[{\"username\":\"foo\",\"password\":\"bar\"},{\"username\":\"bar\",\"password\":\"baz\"}]";
DefaultDataBuffer dataBuffer = factory.wrap(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.contentType(MediaType.APPLICATION_JSON)
.body(body);
Flux<User> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.consumeNextWith(user -> {
assertEquals("foo", user.getUsername());
assertNull(user.getPassword());
})
.consumeNextWith(user -> {
assertEquals("bar", user.getUsername());
assertNull(user.getPassword());
})
.expectComplete()
.verify();
}