下面列出了org.springframework.http.codec.json.Jackson2JsonEncoder#org.springframework.core.io.buffer.DataBuffer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
Map<String, Object> hints) {
mediaType = getMediaType(mediaType);
message.getHeaders().setContentType(mediaType);
Charset charset = mediaType.getCharset() != null ? mediaType.getCharset() : getDefaultCharset();
return Mono.from(inputStream).flatMap(form -> {
logFormData(form, hints);
String value = serializeForm(form, charset);
ByteBuffer byteBuffer = charset.encode(value);
DataBuffer buffer = message.bufferFactory().wrap(byteBuffer); // wrapping only, no allocation
message.getHeaders().setContentLength(byteBuffer.remaining());
return message.writeWith(Mono.just(buffer));
});
}
@Override
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
Flux.from(input), this.jsonFactory, getObjectMapper(), true);
ObjectReader reader = getObjectReader(elementType, hints);
return tokens.handle((tokenBuffer, sink) -> {
try {
Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
logValue(value, hints);
if (value != null) {
sink.next(value);
}
}
catch (IOException ex) {
sink.error(processException(ex));
}
});
}
@Test
public void toEntityList() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
mockTextPlainResponse(body);
List<HttpMessageReader<?>> messageReaders = Collections
.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes()));
given(mockExchangeStrategies.messageReaders()).willReturn(messageReaders);
ResponseEntity<List<String>> result = defaultClientResponse.toEntityList(String.class).block();
assertEquals(Collections.singletonList("foo"), result.getBody());
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(HttpStatus.OK.value(), result.getStatusCodeValue());
assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
}
/**
* Read from the request body InputStream and return a DataBuffer.
* Invoked only when {@link ServletInputStream#isReady()} returns "true".
* @return a DataBuffer with data read, or {@link #EOF_BUFFER} if the input
* stream returned -1, or null if 0 bytes were read.
*/
@Nullable
DataBuffer readFromInputStream() throws IOException {
int read = this.request.getInputStream().read(this.buffer);
logBytesRead(read);
if (read > 0) {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read);
dataBuffer.write(this.buffer, 0, read);
return dataBuffer;
}
if (read == -1) {
return EOF_BUFFER;
}
return null;
}
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue()))
.concatWith(Mono.just(generateLastLine(boundary)));
return outputMessage.writeWith(body);
}
protected Mono<Void> handleAuthenticationFailure(ServerWebExchange exchange, String errorMsg) {
CommonResponseDto responseDto = CommonResponseDto.error(errorMsg);
ServerHttpResponse response = exchange.getResponse();
try {
byte[] bits = objectMapper.writeValueAsBytes(responseDto);
DataBuffer buffer = response.bufferFactory().wrap(bits);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
response.getHeaders().add(HttpHeaders.WWW_AUTHENTICATE, headerValue);
return response.writeWith(Mono.just(buffer));
} catch (JsonProcessingException e) {
log.debug("failed to process json", e);
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return response.setComplete();
}
}
private Mono<Void> sendDeferResponse(ServerHttpResponse serverHttpResponse, ExecutionResult executionResult, Publisher<DeferredExecutionResult> deferredResults) {
// this implements this apollo defer spec: https://github.com/apollographql/apollo-server/blob/defer-support/docs/source/defer-support.md
// the spec says CRLF + "-----" + CRLF is needed at the end, but it works without it and with it we get client
// side errors with it, so we skp it
serverHttpResponse.setStatusCode(HttpStatus.OK);
HttpHeaders headers = serverHttpResponse.getHeaders();
headers.set("Content-Type", "multipart/mixed; boundary=\"-\"");
headers.set("Connection", "keep-alive");
Flux<Mono<DataBuffer>> deferredDataBuffers = Flux.from(deferredResults).map(deferredExecutionResult -> {
DeferPart deferPart = new DeferPart(deferredExecutionResult.toSpecification());
StringBuilder builder = new StringBuilder();
String body = deferPart.write();
builder.append(CRLF).append("---").append(CRLF);
builder.append(body);
return strToDataBuffer(builder.toString());
});
Flux<Mono<DataBuffer>> firstResult = Flux.just(firstResult(executionResult));
return serverHttpResponse.writeAndFlushWith(Flux.mergeSequential(firstResult, deferredDataBuffers));
}
@Test
public void toFormData() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
String text = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3";
DefaultDataBuffer dataBuffer = factory.wrap(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(body);
Mono<MultiValueMap<String, String>> result = BodyExtractors.toFormData().extract(request, this.context);
StepVerifier.create(result)
.consumeNextWith(form -> {
assertEquals("Invalid result", 3, form.size());
assertEquals("Invalid result", "value 1", form.getFirst("name 1"));
List<String> values = form.get("name 2");
assertEquals("Invalid result", 2, values.size());
assertEquals("Invalid result", "value 2+1", values.get(0));
assertEquals("Invalid result", "value 2+2", values.get(1));
assertNull("Invalid result", form.getFirst("name 3"));
})
.expectComplete()
.verify();
}
@Test
public void toFlux() {
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
Flux<String> result = extractor.extract(request, this.context);
StepVerifier.create(result)
.expectNext("foo")
.expectComplete()
.verify();
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
if (!this.httpClient.isStarted()) {
try {
this.httpClient.start();
}
catch (Exception ex) {
return Mono.error(ex);
}
}
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
return requestCallback.apply(clientHttpRequest).then(Mono.from(
clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(response, content));
})));
}
@Test
public void limitResponseSize() {
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
DataBuffer b1 = dataBuffer("foo", bufferFactory);
DataBuffer b2 = dataBuffer("bar", bufferFactory);
DataBuffer b3 = dataBuffer("baz", bufferFactory);
ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();
Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
.filter(request, req -> Mono.just(response));
StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
.expectComplete()
.verify();
}
@Override
public DataBuffer encodeValue(ByteBuffer byteBuffer, DataBufferFactory bufferFactory,
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
String logPrefix = Hints.getLogPrefix(hints);
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
}
return dataBuffer;
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
flux = flux.doOnNext(buffer -> logValue(buffer, hints));
}
return flux;
}
@Override
@Test
public void decode() {
String u = "ü";
String e = "é";
String o = "ø";
String s = String.format("%s\n%s\n%s", u, e, o);
Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8);
testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
}
@Test
public void decodeErrorNonAalto() {
decoder.useAalto = false;
Flux<DataBuffer> source = Flux.concat(
stringBuffer("<pojo>"),
Flux.error(new RuntimeException()));
Flux<XMLEvent> events =
this.decoder.decode(source, null, null, Collections.emptyMap());
StepVerifier.create(events)
.expectError(RuntimeException.class)
.verify();
}
@Override
@Test
public void canEncode() {
assertTrue(this.encoder.canEncode(ResolvableType.forClass(DataBuffer.class),
MimeTypeUtils.TEXT_PLAIN));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Integer.class),
MimeTypeUtils.TEXT_PLAIN));
assertTrue(this.encoder.canEncode(ResolvableType.forClass(DataBuffer.class),
MimeTypeUtils.APPLICATION_JSON));
// SPR-15464
assertFalse(this.encoder.canEncode(ResolvableType.NONE, null));
}
@SuppressWarnings("unchecked")
private Flux<DataBuffer> encodeContent(
@Nullable Object content, MethodParameter returnType, DataBufferFactory bufferFactory,
@Nullable MimeType mimeType, Map<String, Object> hints) {
ResolvableType returnValueType = ResolvableType.forMethodParameter(returnType);
ReactiveAdapter adapter = getAdapterRegistry().getAdapter(returnValueType.resolve(), content);
Publisher<?> publisher;
ResolvableType elementType;
if (adapter != null) {
publisher = adapter.toPublisher(content);
boolean isUnwrapped = KotlinDetector.isKotlinReflectPresent() &&
KotlinDetector.isKotlinType(returnType.getContainingClass()) &&
KotlinDelegate.isSuspend(returnType.getMethod()) &&
!COROUTINES_FLOW_CLASS_NAME.equals(returnValueType.toClass().getName());
ResolvableType genericType = isUnwrapped ? returnValueType : returnValueType.getGeneric();
elementType = getElementType(adapter, genericType);
}
else {
publisher = Mono.justOrEmpty(content);
elementType = (returnValueType.toClass() == Object.class && content != null ?
ResolvableType.forInstance(content) : returnValueType);
}
if (elementType.resolve() == void.class || elementType.resolve() == Void.class) {
return Flux.from(publisher).cast(DataBuffer.class);
}
Encoder<?> encoder = getEncoder(elementType, mimeType);
return Flux.from((Publisher) publisher).map(value ->
encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints));
}
@Override
protected final Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
if (this.writeCalled.compareAndSet(false, true)) {
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
return Mono.from(subscriber -> {
body.subscribe(processor);
processor.subscribe(subscriber);
});
}
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
}
private Mono<DataBuffer> stringBuffer(String value) {
return Mono.defer(() -> {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return Mono.just(buffer);
});
}
private Mono<Void> handle(Payload payload) {
String destination = getDestination(payload);
MessageHeaders headers = createHeaders(destination, null);
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
int refCount = refCount(dataBuffer);
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers);
return Mono.defer(() -> this.handler.apply(message))
.doFinally(s -> {
if (refCount(dataBuffer) == refCount) {
DataBufferUtils.release(dataBuffer);
}
});
}
@Override
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(Flux.from(input), this.jsonFactory, true);
return decodeInternal(tokens, elementType, mimeType, hints);
}
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.doOnNext(this.buffer::write)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.doOnSubscribe(s -> this.hasContentConsumer = true)
.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
this.content.onComplete();
}
}
/**
* Test a {@link Encoder#encode encode} scenario where the input stream is canceled.
* This test method will feed the first element of the {@code input} stream to the decoder,
* followed by a cancel signal.
* The result is expected to contain one "normal" element.
*
* @param input the input to be provided to the encoder
* @param inputType the input type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testEncodeCancel(Publisher<?> input, ResolvableType inputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<DataBuffer> result = encoder().encode(input, this.bufferFactory, inputType, mimeType,
hints);
StepVerifier.create(result)
.consumeNextWith(DataBufferUtils::release)
.thenCancel()
.verify();
}
@SuppressWarnings("SubscriberImplementation")
@Override
public reactor.core.publisher.Flux<DataBuffer> getBody() {
final Optional<Channel> opt = channelResolver.resolveChannel(request);
if (opt.isPresent()) {
final Channel channel = opt.get();
final NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(channel.alloc());
final Optional<HttpContentProcessor<ByteBufHolder>> httpContentProcessor = channelResolver.resolveContentProcessor(request);
if (httpContentProcessor.isPresent()) {
final HttpContentProcessor<ByteBufHolder> processor = httpContentProcessor.get();
return Flux.from(subscriber -> processor.subscribe(new Subscriber<ByteBufHolder>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
@Override
public void onNext(ByteBufHolder byteBufHolder) {
subscriber.onNext(nettyDataBufferFactory.wrap(byteBufHolder.content()));
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}));
}
}
return Flux.empty();
}
@Override
@Nullable
protected DataBuffer read() throws IOException {
if (this.inputStream.isReady()) {
DataBuffer dataBuffer = readFromInputStream();
if (dataBuffer == EOF_BUFFER) {
// No need to wait for container callback...
onAllDataRead();
dataBuffer = null;
}
return dataBuffer;
}
return null;
}
@Override
public byte[] encode(DataBuffer original) {
try {
ByteArrayOutputStream bis = new ByteArrayOutputStream();
GZIPOutputStream gos = new GZIPOutputStream(bis);
FileCopyUtils.copy(original.asInputStream(), gos);
return bis.toByteArray();
}
catch (IOException e) {
throw new IllegalStateException("couldn't encode body to gzip", e);
}
}
/**
* Constructor for a WebSocketMessage.
* <p>See static factory methods in {@link WebSocketSession} or alternatively
* use {@link WebSocketSession#bufferFactory()} to create the payload and
* then invoke this constructor.
*/
public WebSocketMessage(Type type, DataBuffer payload) {
Assert.notNull(type, "'type' must not be null");
Assert.notNull(payload, "'payload' must not be null");
this.type = type;
this.payload = payload;
}
public Consumer<DataBuffer> pojoConsumer(Pojo expected) {
return dataBuffer -> {
try {
Pojo actual = this.mapper.reader().forType(Pojo.class)
.readValue(DataBufferTestUtils.dumpBytes(dataBuffer));
assertEquals(expected, actual);
release(dataBuffer);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
};
}
protected final Consumer<DataBuffer> expect(Msg msg) {
return dataBuffer -> {
try {
assertEquals(msg, Msg.parseDelimitedFrom(dataBuffer.asInputStream()));
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
finally {
DataBufferUtils.release(dataBuffer);
}
};
}
@Test
public void toMonoVoidAsClientWithEmptyBody() {
TestPublisher<DataBuffer> body = TestPublisher.create();
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(body.flux());
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.complete();
})
.verifyComplete();
}