类org.springframework.core.io.buffer.PooledDataBuffer源码实例Demo

下面列出了怎么用org.springframework.core.io.buffer.PooledDataBuffer的API类实例代码及写法,或者点击链接到github查看源代码。

void checkForLeaks(Duration duration) throws InterruptedException {
	Instant start = Instant.now();
	while (true) {
		try {
			this.created.forEach(info -> {
				if (((PooledDataBuffer) info.getDataBuffer()).isAllocated()) {
					throw info.getError();
				}
			});
			break;
		}
		catch (AssertionError ex) {
			if (Instant.now().isAfter(start.plus(duration))) {
				throw ex;
			}
		}
		Thread.sleep(50);
	}
}
 
private Mono<Void> writeMultipart(
		MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {

	byte[] boundary = generateMultipartBoundary();

	Map<String, String> params = new HashMap<>(2);
	params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
	params.put("charset", getCharset().name());

	outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));

	LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
			(isEnableLoggingRequestDetails() ?
					LogFormatUtils.formatValue(map, !traceOn) :
					"parts " + map.keySet() + " (content masked)"));

	DataBufferFactory bufferFactory = outputMessage.bufferFactory();

	Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
			.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
			.concatWith(generateLastLine(boundary, bufferFactory))
			.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);

	return outputMessage.writeWith(body);
}
 
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
	byte[] boundary = boundary(message);
	if (boundary == null) {
		return Flux.error(new CodecException("No multipart boundary found in Content-Type: \"" +
				message.getHeaders().getContentType() + "\""));
	}
	if (logger.isTraceEnabled()) {
		logger.trace("Boundary: " + toString(boundary));
	}

	byte[] boundaryNeedle = concat(BOUNDARY_PREFIX, boundary);
	Flux<DataBuffer> body = skipUntilFirstBoundary(message.getBody(), boundary);

	return DataBufferUtils.split(body, boundaryNeedle)
			.takeWhile(DefaultMultipartMessageReader::notLastBoundary)
			.map(DefaultMultipartMessageReader::toPart)
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
			.doOnDiscard(DefaultPart.class, part -> DataBufferUtils.release(part.body));
}
 
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
		ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	return Flux.from(inputStream)
			.take(1)
			.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
			.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
}
 
源代码5 项目: spring-analysis-note   文件: StringDecoder.java
@Override
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
		@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);

	Flux<DataBuffer> inputFlux = Flux.from(input)
			.flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes))
			.bufferUntil(buffer -> buffer == END_FRAME)
			.map(StringDecoder::joinUntilEndFrame)
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

	return super.decode(inputFlux, elementType, mimeType, hints);
}
 
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
	Flux<ContentChunk> chunks = Flux.from(body)
			.flatMap(Function.identity())
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
			.map(this::toContentChunk);
	ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
	this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
	return doCommit(this::completes);
}
 
@Override
@SuppressWarnings("unchecked")
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
	// Write as Mono if possible as an optimization hint to Reactor Netty
	// ChannelSendOperator not necessary for Mono
	if (body instanceof Mono) {
		return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
				doCommit(() -> writeWithInternal(Mono.just(buffer)))
						.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
	}
	return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
			.doOnError(t -> removeContentLength());
}
 
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
		@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {

	MediaType contentType = updateContentType(message, mediaType);

	Flux<DataBuffer> body = this.encoder.encode(
			inputStream, message.bufferFactory(), elementType, contentType, hints);

	if (inputStream instanceof Mono) {
		HttpHeaders headers = message.getHeaders();
		return Mono.from(body)
				.switchIfEmpty(Mono.defer(() -> {
					headers.setContentLength(0);
					return message.setComplete().then(Mono.empty());
				}))
				.flatMap(buffer -> {
					headers.setContentLength(buffer.readableByteCount());
					return message.writeWith(Mono.just(buffer)
							.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
				});
	}

	if (isStreamingMediaType(contentType)) {
		return message.writeAndFlushWith(body.map(buffer ->
				Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
	}

	return message.writeWith(body);
}
 
源代码9 项目: java-technology-stack   文件: StringDecoder.java
@Override
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
		@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);

	Flux<DataBuffer> inputFlux = Flux.from(inputStream)
			.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
			.bufferUntil(StringDecoder::isEndFrame)
			.map(StringDecoder::joinUntilEndFrame)
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

	return super.decode(inputFlux, elementType, mimeType, hints);
}
 
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
	Flux<ContentChunk> chunks = Flux.from(body)
			.flatMap(Function.identity())
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
			.map(this::toContentChunk);
	ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
	this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
	return doCommit(this::completes);
}
 
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	return chain.filter(exchange).doFinally(s -> {
		PooledDataBuffer dataBuffer = (PooledDataBuffer) exchange.getAttributes()
				.remove(CACHED_REQUEST_BODY_ATTR);
		if (dataBuffer != null && dataBuffer.isAllocated()) {
			if (log.isTraceEnabled()) {
				log.trace("releasing cached body in exchange attribute");
			}
			dataBuffer.release();
		}
	});
}
 
@Override
public PooledDataBuffer retain() {
	this.refCount.incrementAndGet();
	DataBufferUtils.retain(this.dataBuffer);
	return this;
}
 
@Override
public PooledDataBuffer retain() {
	return this;
}
 
源代码14 项目: armeria   文件: DataBufferFactoryWrapper.java
/**
 * Returns a {@link PooledDataBuffer} which will be released after consuming by the consumer.
 * Currently, the {@link NettyDataBuffer} is only one implementation of the {@link PooledDataBuffer}
 * which is exposed to the public API.
 */
private PooledDataBuffer withNettyDataBufferFactory(PooledHttpData data) {
    return ((NettyDataBufferFactory) delegate).wrap(data.content());
}
 
 同包方法