下面列出了怎么用org.springframework.core.io.buffer.DataBufferFactory的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Use this method to slice, retain and wrap the data portion of the
* {@code Payload}, and also to release the {@code Payload}. This assumes
* the Payload metadata has been read by now and ensures downstream code
* need only be aware of {@code DataBuffer}s.
* @param payload the payload to process
* @param bufferFactory the DataBufferFactory to wrap with
* @return the created {@code DataBuffer} instance
*/
public static DataBuffer retainDataAndReleasePayload(Payload payload, DataBufferFactory bufferFactory) {
try {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBuf byteBuf = payload.sliceData().retain();
return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
}
else {
return bufferFactory.wrap(payload.getData());
}
}
finally {
if (payload.refCnt() > 0) {
payload.release();
}
}
}
private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
HttpHeaders headers = message.getHeaders();
MediaType resourceMediaType = getResourceMediaType(mediaType, resource, hints);
headers.setContentType(resourceMediaType);
if (headers.getContentLength() < 0) {
long length = lengthOf(resource);
if (length != -1) {
headers.setContentLength(length);
}
}
return zeroCopy(resource, null, message, hints)
.orElseGet(() -> {
Mono<Resource> input = Mono.just(resource);
DataBufferFactory factory = message.bufferFactory();
Flux<DataBuffer> body = this.encoder.encode(input, factory, type, resourceMediaType, hints);
return message.writeWith(body);
});
}
/**
* Create a new WebSocket session.
*/
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "Session id is required.");
Assert.notNull(info, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBuffer factory is required.");
this.delegate = delegate;
this.id = id;
this.handshakeInfo = info;
this.bufferFactory = bufferFactory;
this.attributes.putAll(info.getAttributes());
this.logPrefix = initLogPrefix(info, id);
if (logger.isDebugEnabled()) {
logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
}
}
@Override
public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResource,
ResourceTransformerChain chain) {
return chain.transform(exchange, inputResource)
.flatMap(outputResource -> {
String name = outputResource.getFilename();
if (!this.fileExtension.equals(StringUtils.getFilenameExtension(name))) {
return Mono.just(outputResource);
}
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
Flux<DataBuffer> flux = DataBufferUtils
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
return DataBufferUtils.join(flux)
.flatMap(dataBuffer -> {
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);
String content = charBuffer.toString();
return transform(content, outputResource, chain, exchange);
});
});
}
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse,
CachedBodyOutputMessage message, Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}
List<String> encodingHeaders = httpResponse.getHeaders()
.getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel())
.map(encoder::encode).map(dataBufferFactory::wrap);
break;
}
}
return response;
}
@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeData(@Nullable T data, ResolvableType valueType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
if (data == null) {
return Flux.empty();
}
if (data instanceof String) {
String text = (String) data;
return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
}
if (this.encoder == null) {
return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
}
return ((Encoder<T>) this.encoder)
.encode(Mono.just(data), factory, valueType, mediaType, hints)
.concatWith(encodeText("\n", mediaType, factory));
}
private Mono<Void> writeMultipart(
MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {
byte[] boundary = generateMultipartBoundary();
Map<String, String> params = new HashMap<>(2);
params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
params.put("charset", getCharset().name());
outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(map, !traceOn) :
"parts " + map.keySet() + " (content masked)"));
DataBufferFactory bufferFactory = outputMessage.bufferFactory();
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
.concatWith(generateLastLine(boundary, bufferFactory))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
return outputMessage.writeWith(body);
}
public static Mono<Void> responseWrite(ServerWebExchange exchange, int httpStatus, Result result) {
if (httpStatus == 0) {
httpStatus = HttpStatus.INTERNAL_SERVER_ERROR.value();
}
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setAccessControlAllowCredentials(true);
response.getHeaders().setAccessControlAllowOrigin("*");
response.setStatusCode(HttpStatus.valueOf(httpStatus));
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
DataBufferFactory dataBufferFactory = response.bufferFactory();
DataBuffer buffer = dataBufferFactory.wrap(JSONObject.toJSONString(result).getBytes(Charset.defaultCharset()));
return response.writeWith(Mono.just(buffer)).doOnError((error) -> {
DataBufferUtils.release(buffer);
});
}
@Override
public DataBuffer encodeValue(Forwarding value, DataBufferFactory bufferFactory,
ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
ByteBuf encoded = Forwarding.encode(factory.getByteBufAllocator(), value);
return factory.wrap(encoded);
}
private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders,
ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) {
this.encoders = Collections.unmodifiableList(encoders);
this.decoders = Collections.unmodifiableList(decoders);
this.adapterRegistry = adapterRegistry;
this.bufferFactory = bufferFactory;
}
MessagingRSocket(Function<Message<?>, Mono<Void>> handler, RSocketRequester requester,
@Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) {
Assert.notNull(handler, "'handler' is required");
Assert.notNull(requester, "'requester' is required");
this.handler = handler;
this.requester = requester;
this.dataMimeType = defaultDataMimeType;
this.bufferFactory = bufferFactory;
}
@NotNull
@Override
public Flux<DataBuffer> encode(@NotNull Publisher<?> inputStream, @NotNull DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.handle((obj, sink) -> {
try {
sink.next(encode(obj, bufferFactory));
} catch (Exception e) {
sink.error(e);
}
});
}
@Override
public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE);
DataBufferFactory bufferFactory = message.bufferFactory();
message.getHeaders().setContentType(mediaType);
return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints));
}
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).
take(1).
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
@Override
protected Flux<DataBuffer> encode(Object value, DataBufferFactory bufferFactory,
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
// we're relying on doOnDiscard in base class
return Mono.fromCallable(() -> encodeValue(value, bufferFactory, valueType, mimeType, hints)).flux();
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
return Flux.from(inputStream).map(byteBuffer -> {
DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
String logPrefix = Hints.getLogPrefix(hints);
logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
}
return dataBuffer;
});
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
private Mono<DataBuffer> generateBoundaryLine(byte[] boundary, DataBufferFactory bufferFactory) {
return Mono.fromCallable(() -> {
DataBuffer buffer = bufferFactory.allocateBuffer(boundary.length + 4);
buffer.write((byte)'-');
buffer.write((byte)'-');
buffer.write(boundary);
buffer.write((byte)'\r');
buffer.write((byte)'\n');
return buffer;
});
}
@Override
public DataBuffer encodeValue(CharSequence charSequence, DataBufferFactory bufferFactory,
ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(charSequence, !traceOn);
return Hints.getLogPrefix(hints) + "Writing " + formatted;
});
}
boolean release = true;
Charset charset = getCharset(mimeType);
int capacity = calculateCapacity(charSequence, charset);
DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity);
try {
dataBuffer.write(charSequence, charset);
release = false;
}
catch (CoderMalfunctionError ex) {
throw new EncodingException("String encoding error: " + ex.getMessage(), ex);
}
finally {
if (release) {
DataBufferUtils.release(dataBuffer);
}
}
return dataBuffer;
}
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory bufferFactory,
PartBodyStreamStorageFactory streamStorageFactory) {
this.inputMessage = inputMessage;
this.bufferFactory = bufferFactory;
this.streamStorageFactory = streamStorageFactory;
}
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
// TODO: suspend causes failures if invoked at this stage
// suspendReceiving();
}
AbstractSynchronossPart(HttpHeaders headers, DataBufferFactory bufferFactory) {
Assert.notNull(headers, "HttpHeaders is required");
Assert.notNull(bufferFactory, "DataBufferFactory is required");
this.name = MultipartUtils.getFieldName(headers);
this.headers = headers;
this.bufferFactory = bufferFactory;
}
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
Assert.notNull(headers, "HttpHeaders must not be null");
this.dataBufferFactory = dataBufferFactory;
this.headers = headers;
this.cookies = new LinkedMultiValueMap<>();
}
UndertowServerHttpResponse(
HttpServerExchange exchange, DataBufferFactory bufferFactory, UndertowServerHttpRequest request) {
super(bufferFactory, createHeaders(exchange));
Assert.notNull(exchange, "HttpServerExchange must not be null");
this.exchange = exchange;
this.request = request;
}
@Override
public DataBuffer encodeValue(RouteSetup value, DataBufferFactory bufferFactory,
ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
ByteBuf encoded = RouteSetup.encode(factory.getByteBufAllocator(), value);
return factory.wrap(encoded);
}
/**
* Returns a new {@link DataBufferFactoryWrapper} for {@link ArmeriaWebServer} and
* {@link ArmeriaClientHttpConnector}.
*/
@Bean
@ConditionalOnMissingBean(DataBufferFactoryWrapper.class)
public DataBufferFactoryWrapper<?> armeriaBufferFactory(
Optional<DataBufferFactory> dataBufferFactory) {
if (dataBufferFactory.isPresent()) {
return new DataBufferFactoryWrapper<>(dataBufferFactory.get());
}
return DataBufferFactoryWrapper.DEFAULT;
}
public <T> Mono<Void> write(ServerHttpResponse httpResponse, T object) {
DataBufferFactory bufferFactory = httpResponse.bufferFactory();
return httpResponse
.writeWith(Mono.fromSupplier(() -> {
try {
return bufferFactory.wrap(objectMapper.writeValueAsBytes(object));
} catch (Exception ex) {
log.warn("Error writing response", ex);
return bufferFactory.wrap(new byte[0]);
}
}));
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream).map(message ->
encodeValue(message, bufferFactory, !(inputStream instanceof Mono)));
}
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
};
}
@Override
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
throw new UnsupportedOperationException("Does not support stream encoding yet");
}