下面列出了怎么用org.springframework.core.io.buffer.PooledDataBuffer的API类实例代码及写法,或者点击链接到github查看源代码。
void checkForLeaks(Duration duration) throws InterruptedException {
Instant start = Instant.now();
while (true) {
try {
this.created.forEach(info -> {
if (((PooledDataBuffer) info.getDataBuffer()).isAllocated()) {
throw info.getError();
}
});
break;
}
catch (AssertionError ex) {
if (Instant.now().isAfter(start.plus(duration))) {
throw ex;
}
}
Thread.sleep(50);
}
}
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
DataBufferFactory bufferFactory = outputMessage.bufferFactory();
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
.concatWith(generateLastLine(boundary, bufferFactory))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
return outputMessage.writeWith(body);
}
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
byte[] boundary = boundary(message);
if (boundary == null) {
return Flux.error(new CodecException("No multipart boundary found in Content-Type: \"" +
message.getHeaders().getContentType() + "\""));
}
if (logger.isTraceEnabled()) {
logger.trace("Boundary: " + toString(boundary));
}
byte[] boundaryNeedle = concat(BOUNDARY_PREFIX, boundary);
Flux<DataBuffer> body = skipUntilFirstBoundary(message.getBody(), boundary);
return DataBufferUtils.split(body, boundaryNeedle)
.takeWhile(DefaultMultipartMessageReader::notLastBoundary)
.map(DefaultMultipartMessageReader::toPart)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.doOnDiscard(DefaultPart.class, part -> DataBufferUtils.release(part.body));
}
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream)
.take(1)
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
}
@Override
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.from(input)
.flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes))
.bufferUntil(buffer -> buffer == END_FRAME)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
@Override
@SuppressWarnings("unchecked")
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
// Write as Mono if possible as an optimization hint to Reactor Netty
// ChannelSendOperator not necessary for Mono
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
doCommit(() -> writeWithInternal(Mono.just(buffer)))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
.doOnError(t -> removeContentLength());
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
MediaType contentType = updateContentType(message, mediaType);
Flux<DataBuffer> body = this.encoder.encode(
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return Mono.from(body)
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
}
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
}
return message.writeWith(body);
}
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.bufferUntil(StringDecoder::isEndFrame)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).doFinally(s -> {
PooledDataBuffer dataBuffer = (PooledDataBuffer) exchange.getAttributes()
.remove(CACHED_REQUEST_BODY_ATTR);
if (dataBuffer != null && dataBuffer.isAllocated()) {
if (log.isTraceEnabled()) {
log.trace("releasing cached body in exchange attribute");
}
dataBuffer.release();
}
});
}
@Override
public PooledDataBuffer retain() {
this.refCount.incrementAndGet();
DataBufferUtils.retain(this.dataBuffer);
return this;
}
@Override
public PooledDataBuffer retain() {
return this;
}
/**
* Returns a {@link PooledDataBuffer} which will be released after consuming by the consumer.
* Currently, the {@link NettyDataBuffer} is only one implementation of the {@link PooledDataBuffer}
* which is exposed to the public API.
*/
private PooledDataBuffer withNettyDataBufferFactory(PooledHttpData data) {
return ((NettyDataBufferFactory) delegate).wrap(data.content());
}