下面列出了怎么用org.springframework.http.ReactiveHttpOutputMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void ofString() {
String body = "foo";
BodyInserter<String, ReactiveHttpOutputMessage> inserter = BodyInserters.fromObject(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}
@Test
public void ofPublisher() {
Flux<String> body = Flux.just("foo");
BodyInserter<Flux<String>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromPublisher(body, String.class);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}
@Test
public void ofResource() throws IOException {
Resource body = new ClassPathResource("response.txt", getClass());
BodyInserter<Resource, ReactiveHttpOutputMessage> inserter = BodyInserters.fromResource(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
byte[] expectedBytes = Files.readAllBytes(body.getFile().toPath());
StepVerifier.create(response.getBody())
.consumeNextWith(dataBuffer -> {
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(resultBytes);
DataBufferUtils.release(dataBuffer);
assertArrayEquals(expectedBytes, resultBytes);
})
.expectComplete()
.verify();
}
@Test
public void ofDataBuffers() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends Message> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
try {
Message.Builder builder = getMessageBuilder(elementType.toClass());
Descriptors.Descriptor descriptor = builder.getDescriptorForType();
message.getHeaders().add(X_PROTOBUF_SCHEMA_HEADER, descriptor.getFile().getName());
message.getHeaders().add(X_PROTOBUF_MESSAGE_HEADER, descriptor.getFullName());
if (inputStream instanceof Flux) {
if (mediaType == null) {
message.getHeaders().setContentType(((HttpMessageEncoder<?>)getEncoder()).getStreamingMediaTypes().get(0));
}
else if (!ProtobufEncoder.DELIMITED_VALUE.equals(mediaType.getParameters().get(ProtobufEncoder.DELIMITED_KEY))) {
Map<String, String> parameters = new HashMap<>(mediaType.getParameters());
parameters.put(ProtobufEncoder.DELIMITED_KEY, ProtobufEncoder.DELIMITED_VALUE);
message.getHeaders().setContentType(new MediaType(mediaType.getType(), mediaType.getSubtype(), parameters));
}
}
return super.write(inputStream, elementType, mediaType, message, hints);
}
catch (Exception ex) {
return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
}
}
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, ?>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage outputMessage,
Map<String, Object> hints) {
return Mono.from(inputStream)
.flatMap(map -> {
if (this.formWriter == null || isMultipart(map, mediaType)) {
return writeMultipart(map, outputMessage, hints);
}
else {
@SuppressWarnings("unchecked")
Mono<MultiValueMap<String, String>> input = Mono.just((MultiValueMap<String, String>) map);
return this.formWriter.write(input, elementType, mediaType, outputMessage, hints);
}
});
}
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
DataBufferFactory bufferFactory = outputMessage.bufferFactory();
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
.concatWith(generateLastLine(boundary, bufferFactory))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
return outputMessage.writeWith(body);
}
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
Map<String, Object> hints) {
mediaType = getMediaType(mediaType);
message.getHeaders().setContentType(mediaType);
Charset charset = mediaType.getCharset() != null ? mediaType.getCharset() : getDefaultCharset();
return Mono.from(inputStream).flatMap(form -> {
logFormData(form, hints);
String value = serializeForm(form, charset);
ByteBuffer byteBuffer = charset.encode(value);
DataBuffer buffer = message.bufferFactory().wrap(byteBuffer); // wrapping only, no allocation
message.getHeaders().setContentLength(byteBuffer.remaining());
return message.writeWith(Mono.just(buffer));
});
}
private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
HttpHeaders headers = message.getHeaders();
MediaType resourceMediaType = getResourceMediaType(mediaType, resource, hints);
headers.setContentType(resourceMediaType);
if (headers.getContentLength() < 0) {
long length = lengthOf(resource);
if (length != -1) {
headers.setContentLength(length);
}
}
return zeroCopy(resource, null, message, hints)
.orElseGet(() -> {
Mono<Resource> input = Mono.just(resource);
DataBufferFactory factory = message.bufferFactory();
Flux<DataBuffer> body = this.encoder.encode(input, factory, type, resourceMediaType, hints);
return message.writeWith(body);
});
}
private static Optional<Mono<Void>> zeroCopy(Resource resource, @Nullable ResourceRegion region,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
if (message instanceof ZeroCopyHttpOutputMessage && resource.isFile()) {
try {
File file = resource.getFile();
long pos = region != null ? region.getPosition() : 0;
long count = region != null ? region.getCount() : file.length();
if (logger.isDebugEnabled()) {
String formatted = region != null ? "region " + pos + "-" + (count) + " of " : "";
logger.debug(Hints.getLogPrefix(hints) + "Zero-copy " + formatted + "[" + resource + "]");
}
return Optional.of(((ZeroCopyHttpOutputMessage) message).writeWith(file, pos, count));
}
catch (IOException ex) {
// should not happen
}
}
return Optional.empty();
}
@Test
public void ofString() {
String body = "foo";
BodyInserter<String, ReactiveHttpOutputMessage> inserter = BodyInserters.fromObject(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}
@Test
public void ofPublisher() {
Flux<String> body = Flux.just("foo");
BodyInserter<Flux<String>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromPublisher(body, String.class);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}
@Test
public void ofResource() throws IOException {
Resource body = new ClassPathResource("response.txt", getClass());
BodyInserter<Resource, ReactiveHttpOutputMessage> inserter = BodyInserters.fromResource(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
byte[] expectedBytes = Files.readAllBytes(body.getFile().toPath());
StepVerifier.create(response.getBody())
.consumeNextWith(dataBuffer -> {
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(resultBytes);
DataBufferUtils.release(dataBuffer);
assertArrayEquals(expectedBytes, resultBytes);
})
.expectComplete()
.verify();
}
@Test
public void ofDataBuffers() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBody())
.expectNext(dataBuffer)
.expectComplete()
.verify();
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
MediaType contentType = updateContentType(message, mediaType);
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return Mono.from(body)
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer));
});
}
return (isStreamingMediaType(contentType) ?
message.writeAndFlushWith(body.map(Flux::just)) : message.writeWith(body));
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends Message> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
try {
Message.Builder builder = getMessageBuilder(elementType.toClass());
Descriptors.Descriptor descriptor = builder.getDescriptorForType();
message.getHeaders().add(X_PROTOBUF_SCHEMA_HEADER, descriptor.getFile().getName());
message.getHeaders().add(X_PROTOBUF_MESSAGE_HEADER, descriptor.getFullName());
if (inputStream instanceof Flux) {
if (mediaType == null) {
message.getHeaders().setContentType(((HttpMessageEncoder<?>)getEncoder()).getStreamingMediaTypes().get(0));
}
else if (!ProtobufEncoder.DELIMITED_VALUE.equals(mediaType.getParameters().get(ProtobufEncoder.DELIMITED_KEY))) {
Map<String, String> parameters = new HashMap<>(mediaType.getParameters());
parameters.put(ProtobufEncoder.DELIMITED_KEY, ProtobufEncoder.DELIMITED_VALUE);
message.getHeaders().setContentType(new MediaType(mediaType.getType(), mediaType.getSubtype(), parameters));
}
}
return super.write(inputStream, elementType, mediaType, message, hints);
}
catch (Exception ex) {
return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
}
}
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, ?>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage outputMessage,
Map<String, Object> hints) {
return Mono.from(inputStream).flatMap(map -> {
if (this.formWriter == null || isMultipart(map, mediaType)) {
return writeMultipart(map, outputMessage, hints);
}
else {
@SuppressWarnings("unchecked")
MultiValueMap<String, String> formData = (MultiValueMap<String, String>) map;
return this.formWriter.write(Mono.just(formData), elementType, mediaType, outputMessage, hints);
}
});
}
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue()))
.concatWith(Mono.just(generateLastLine(boundary)));
return outputMessage.writeWith(body);
}
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
Map<String, Object> hints) {
mediaType = getMediaType(mediaType);
message.getHeaders().setContentType(mediaType);
Charset charset = mediaType.getCharset();
Assert.notNull(charset, "No charset"); // should never occur
return Mono.from(inputStream).flatMap(form -> {
logFormData(form, hints);
String value = serializeForm(form, charset);
ByteBuffer byteBuffer = charset.encode(value);
DataBuffer buffer = message.bufferFactory().wrap(byteBuffer);
message.getHeaders().setContentLength(byteBuffer.remaining());
return message.writeWith(Mono.just(buffer));
});
}
private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
HttpHeaders headers = message.getHeaders();
MediaType resourceMediaType = getResourceMediaType(mediaType, resource, hints);
headers.setContentType(resourceMediaType);
if (headers.getContentLength() < 0) {
long length = lengthOf(resource);
if (length != -1) {
headers.setContentLength(length);
}
}
return zeroCopy(resource, null, message, hints)
.orElseGet(() -> {
Mono<Resource> input = Mono.just(resource);
DataBufferFactory factory = message.bufferFactory();
Flux<DataBuffer> body = this.encoder.encode(input, factory, type, resourceMediaType, hints);
return message.writeWith(body);
});
}
private static Optional<Mono<Void>> zeroCopy(Resource resource, @Nullable ResourceRegion region,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
if (message instanceof ZeroCopyHttpOutputMessage && resource.isFile()) {
try {
File file = resource.getFile();
long pos = region != null ? region.getPosition() : 0;
long count = region != null ? region.getCount() : file.length();
if (logger.isDebugEnabled()) {
String formatted = region != null ? "region " + pos + "-" + (count) + " of " : "";
logger.debug(Hints.getLogPrefix(hints) + "Zero-copy " + formatted + "[" + resource + "]");
}
return Optional.of(((ZeroCopyHttpOutputMessage) message).writeWith(file, pos, count));
}
catch (IOException ex) {
// should not happen
}
}
return Optional.empty();
}
/**
* Inserter to write the given {@code Resource}.
* <p>If the resource can be resolved to a {@linkplain Resource#getFile() file}, it will
* be copied using <a href="https://en.wikipedia.org/wiki/Zero-copy">zero-copy</a>.
* @param resource the resource to write to the output message
* @param <T> the type of the {@code Resource}
* @return the inserter to write a {@code Publisher}
*/
public static <T extends Resource> BodyInserter<T, ReactiveHttpOutputMessage> fromResource(T resource) {
Assert.notNull(resource, "Resource must not be null");
return (outputMessage, context) -> {
ResolvableType elementType = RESOURCE_TYPE;
HttpMessageWriter<Resource> writer = findWriter(context, elementType, null);
return write(Mono.just(resource), elementType, null, outputMessage, context, writer);
};
}
private static <P extends Publisher<?>, M extends ReactiveHttpOutputMessage> Mono<Void> writeWithMessageWriters(
M outputMessage, BodyInserter.Context context, P body, ResolvableType bodyType) {
MediaType mediaType = outputMessage.getHeaders().getContentType();
return context.messageWriters().stream()
.filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType))
.findFirst()
.map(BodyInserters::cast)
.map(writer -> write(body, bodyType, mediaType, outputMessage, context, writer))
.orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType)));
}
private static <T> Mono<Void> write(Publisher<? extends T> input, ResolvableType type,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
BodyInserter.Context context, HttpMessageWriter<T> writer) {
return context.serverRequest()
.map(request -> {
ServerHttpResponse response = (ServerHttpResponse) message;
return writer.write(input, type, type, mediaType, request, response, context.hints());
})
.orElseGet(() -> writer.write(input, type, mediaType, message, context.hints()));
}
@Test
public void ofObject() {
User body = new User("foo", "bar");
BodyInserter<User, ReactiveHttpOutputMessage> inserter = BodyInserters.fromObject(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBodyAsString())
.expectNext("{\"username\":\"foo\",\"password\":\"bar\"}")
.expectComplete()
.verify();
}
@Test
public void ofObjectWithHints() {
User body = new User("foo", "bar");
BodyInserter<User, ReactiveHttpOutputMessage> inserter = BodyInserters.fromObject(body);
this.hints.put(JSON_VIEW_HINT, SafeToSerialize.class);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(response.getBodyAsString())
.expectNext("{\"username\":\"foo\"}")
.expectComplete()
.verify();
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
MediaType contentType = updateContentType(message, mediaType);
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return Mono.from(body)
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
}
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
}
return message.writeWith(body);
}
@Nullable
private MediaType updateContentType(ReactiveHttpOutputMessage message, @Nullable MediaType mediaType) {
MediaType result = message.getHeaders().getContentType();
if (result != null) {
return result;
}
MediaType fallback = this.defaultMediaType;
result = (useFallback(mediaType, fallback) ? fallback : mediaType);
if (result != null) {
result = addDefaultCharset(result, fallback);
message.getHeaders().setContentType(result);
}
return result;
}
@Override
public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE);
DataBufferFactory bufferFactory = message.bufferFactory();
message.getHeaders().setContentType(mediaType);
return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints));
}
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
return Mono.from(inputStream).flatMap(resource ->
writeResource(resource, elementType, mediaType, message, hints));
}