类org.springframework.core.io.buffer.DataBufferFactory源码实例Demo

下面列出了怎么用org.springframework.core.io.buffer.DataBufferFactory的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spring-analysis-note   文件: PayloadUtils.java
/**
 * Use this method to slice, retain and wrap the data portion of the
 * {@code Payload}, and also to release the {@code Payload}. This assumes
 * the Payload metadata has been read by now and ensures downstream code
 * need only be aware of {@code DataBuffer}s.
 * @param payload the payload to process
 * @param bufferFactory the DataBufferFactory to wrap with
 * @return the created {@code DataBuffer} instance
 */
public static DataBuffer retainDataAndReleasePayload(Payload payload, DataBufferFactory bufferFactory) {
	try {
		if (bufferFactory instanceof NettyDataBufferFactory) {
			ByteBuf byteBuf = payload.sliceData().retain();
			return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
		}
		else {
			return bufferFactory.wrap(payload.getData());
		}
	}
	finally {
		if (payload.refCnt() > 0) {
			payload.release();
		}
	}
}
 
private Mono<Void> writeResource(Resource resource, ResolvableType type, @Nullable MediaType mediaType,
		ReactiveHttpOutputMessage message, Map<String, Object> hints) {

	HttpHeaders headers = message.getHeaders();
	MediaType resourceMediaType = getResourceMediaType(mediaType, resource, hints);
	headers.setContentType(resourceMediaType);

	if (headers.getContentLength() < 0) {
		long length = lengthOf(resource);
		if (length != -1) {
			headers.setContentLength(length);
		}
	}

	return zeroCopy(resource, null, message, hints)
			.orElseGet(() -> {
				Mono<Resource> input = Mono.just(resource);
				DataBufferFactory factory = message.bufferFactory();
				Flux<DataBuffer> body = this.encoder.encode(input, factory, type, resourceMediaType, hints);
				return message.writeWith(body);
			});
}
 
/**
 * Create a new WebSocket session.
 */
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
	Assert.notNull(delegate, "Native session is required.");
	Assert.notNull(id, "Session id is required.");
	Assert.notNull(info, "HandshakeInfo is required.");
	Assert.notNull(bufferFactory, "DataBuffer factory is required.");

	this.delegate = delegate;
	this.id = id;
	this.handshakeInfo = info;
	this.bufferFactory = bufferFactory;
	this.attributes.putAll(info.getAttributes());
	this.logPrefix = initLogPrefix(info, id);

	if (logger.isDebugEnabled()) {
		logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
	}
}
 
@Override
public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResource,
		ResourceTransformerChain chain) {

	return chain.transform(exchange, inputResource)
			.flatMap(outputResource -> {
				String name = outputResource.getFilename();
				if (!this.fileExtension.equals(StringUtils.getFilenameExtension(name))) {
					return Mono.just(outputResource);
				}
				DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
				Flux<DataBuffer> flux = DataBufferUtils
						.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
				return DataBufferUtils.join(flux)
						.flatMap(dataBuffer -> {
							CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
							DataBufferUtils.release(dataBuffer);
							String content = charBuffer.toString();
							return transform(content, outputResource, chain, exchange);
						});
			});
}
 
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse,
		CachedBodyOutputMessage message, Class<?> outClass) {
	Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
	if (byte[].class.isAssignableFrom(outClass)) {
		return response;
	}

	List<String> encodingHeaders = httpResponse.getHeaders()
			.getOrEmpty(HttpHeaders.CONTENT_ENCODING);
	for (String encoding : encodingHeaders) {
		MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
		if (encoder != null) {
			DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
			response = response.publishOn(Schedulers.parallel())
					.map(encoder::encode).map(dataBufferFactory::wrap);
			break;
		}
	}

	return response;
}
 
@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeData(@Nullable T data, ResolvableType valueType,
		MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {

	if (data == null) {
		return Flux.empty();
	}

	if (data instanceof String) {
		String text = (String) data;
		return Flux.from(encodeText(StringUtils.replace(text, "\n", "\ndata:") + "\n", mediaType, factory));
	}

	if (this.encoder == null) {
		return Flux.error(new CodecException("No SSE encoder configured and the data is not String."));
	}

	return ((Encoder<T>) this.encoder)
			.encode(Mono.just(data), factory, valueType, mediaType, hints)
			.concatWith(encodeText("\n", mediaType, factory));
}
 
private Mono<Void> writeMultipart(
		MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {

	byte[] boundary = generateMultipartBoundary();

	Map<String, String> params = new HashMap<>(2);
	params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
	params.put("charset", getCharset().name());

	outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));

	LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
			(isEnableLoggingRequestDetails() ?
					LogFormatUtils.formatValue(map, !traceOn) :
					"parts " + map.keySet() + " (content masked)"));

	DataBufferFactory bufferFactory = outputMessage.bufferFactory();

	Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
			.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
			.concatWith(generateLastLine(boundary, bufferFactory))
			.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);

	return outputMessage.writeWith(body);
}
 
public static Mono<Void> responseWrite(ServerWebExchange exchange, int httpStatus, Result result) {
    if (httpStatus == 0) {
        httpStatus = HttpStatus.INTERNAL_SERVER_ERROR.value();
    }
    ServerHttpResponse response = exchange.getResponse();
    response.getHeaders().setAccessControlAllowCredentials(true);
    response.getHeaders().setAccessControlAllowOrigin("*");
    response.setStatusCode(HttpStatus.valueOf(httpStatus));
    response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
    DataBufferFactory dataBufferFactory = response.bufferFactory();
    DataBuffer buffer = dataBufferFactory.wrap(JSONObject.toJSONString(result).getBytes(Charset.defaultCharset()));
    return response.writeWith(Mono.just(buffer)).doOnError((error) -> {
        DataBufferUtils.release(buffer);
    });
}
 
源代码9 项目: spring-cloud-rsocket   文件: Forwarding.java
@Override
public DataBuffer encodeValue(Forwarding value, DataBufferFactory bufferFactory,
		ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
	NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
	ByteBuf encoded = Forwarding.encode(factory.getByteBufAllocator(), value);
	return factory.wrap(encoded);
}
 
private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders,
		ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) {

	this.encoders = Collections.unmodifiableList(encoders);
	this.decoders = Collections.unmodifiableList(decoders);
	this.adapterRegistry = adapterRegistry;
	this.bufferFactory = bufferFactory;
}
 
源代码11 项目: spring-analysis-note   文件: MessagingRSocket.java
MessagingRSocket(Function<Message<?>, Mono<Void>> handler, RSocketRequester requester,
		@Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) {

	Assert.notNull(handler, "'handler' is required");
	Assert.notNull(requester, "'requester' is required");
	this.handler = handler;
	this.requester = requester;
	this.dataMimeType = defaultDataMimeType;
	this.bufferFactory = bufferFactory;
}
 
源代码12 项目: alibaba-rsocket-broker   文件: HessianEncoder.java
@NotNull
@Override
public Flux<DataBuffer> encode(@NotNull Publisher<?> inputStream, @NotNull DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
    return Flux.from(inputStream)
            .handle((obj, sink) -> {
                try {
                    sink.next(encode(obj, bufferFactory));
                } catch (Exception e) {
                    sink.error(e);
                }
            });
}
 
@Override
public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType,
		ReactiveHttpOutputMessage message, Map<String, Object> hints) {

	mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE);
	DataBufferFactory bufferFactory = message.bufferFactory();

	message.getHeaders().setContentType(mediaType);
	return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints));
}
 
@Override
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
		ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	return Flux.from(inputStream).
			take(1).
			concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
}
 
源代码15 项目: spring-analysis-note   文件: Jaxb2XmlEncoder.java
@Override
protected Flux<DataBuffer> encode(Object value, DataBufferFactory bufferFactory,
		ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	// we're relying on doOnDiscard in base class
	return Mono.fromCallable(() -> encodeValue(value, bufferFactory, valueType, mimeType, hints)).flux();
}
 
源代码16 项目: java-technology-stack   文件: ByteBufferEncoder.java
@Override
public Flux<DataBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
		DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
		@Nullable Map<String, Object> hints) {

	return Flux.from(inputStream).map(byteBuffer -> {
		DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
		if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
			String logPrefix = Hints.getLogPrefix(hints);
			logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
		}
		return dataBuffer;
	});
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){

	ServerHttpRequest request = exchange.getRequest();
	ServerHttpResponse response = exchange.getResponse();

	HttpServletRequest servletRequest = getHttpServletRequest(request);
	HttpServletResponse servletResponse = getHttpServletResponse(response);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory bufferFactory = response.bufferFactory();

	Endpoint endpoint = new StandardWebSocketHandlerAdapter(
			handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));

	String requestURI = servletRequest.getRequestURI();
	DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
	config.setSubprotocols(subProtocol != null ?
			Collections.singletonList(subProtocol) : Collections.emptyList());

	try {
		WsServerContainer container = getContainer(servletRequest);
		container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
	}
	catch (ServletException | IOException ex) {
		return Mono.error(ex);
	}

	return Mono.empty();
}
 
private Mono<DataBuffer> generateBoundaryLine(byte[] boundary, DataBufferFactory bufferFactory) {
	return Mono.fromCallable(() -> {
		DataBuffer buffer = bufferFactory.allocateBuffer(boundary.length + 4);
		buffer.write((byte)'-');
		buffer.write((byte)'-');
		buffer.write(boundary);
		buffer.write((byte)'\r');
		buffer.write((byte)'\n');
		return buffer;
	});
}
 
源代码19 项目: spring-analysis-note   文件: CharSequenceEncoder.java
@Override
public DataBuffer encodeValue(CharSequence charSequence, DataBufferFactory bufferFactory,
		ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	if (!Hints.isLoggingSuppressed(hints)) {
		LogFormatUtils.traceDebug(logger, traceOn -> {
			String formatted = LogFormatUtils.formatValue(charSequence, !traceOn);
			return Hints.getLogPrefix(hints) + "Writing " + formatted;
		});
	}
	boolean release = true;
	Charset charset = getCharset(mimeType);
	int capacity = calculateCapacity(charSequence, charset);
	DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity);
	try {
		dataBuffer.write(charSequence, charset);
		release = false;
	}
	catch (CoderMalfunctionError ex) {
		throw new EncodingException("String encoding error: " + ex.getMessage(), ex);
	}
	finally {
		if (release) {
			DataBufferUtils.release(dataBuffer);
		}
	}
	return dataBuffer;
}
 
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory bufferFactory,
		PartBodyStreamStorageFactory streamStorageFactory) {

	this.inputMessage = inputMessage;
	this.bufferFactory = bufferFactory;
	this.streamStorageFactory = streamStorageFactory;
}
 
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
		@Nullable MonoProcessor<Void> completionMono) {

	super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
	// TODO: suspend causes failures if invoked at this stage
	// suspendReceiving();
}
 
AbstractSynchronossPart(HttpHeaders headers, DataBufferFactory bufferFactory) {
	Assert.notNull(headers, "HttpHeaders is required");
	Assert.notNull(bufferFactory, "DataBufferFactory is required");
	this.name = MultipartUtils.getFieldName(headers);
	this.headers = headers;
	this.bufferFactory = bufferFactory;
}
 
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
	Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
	Assert.notNull(headers, "HttpHeaders must not be null");
	this.dataBufferFactory = dataBufferFactory;
	this.headers = headers;
	this.cookies = new LinkedMultiValueMap<>();
}
 
UndertowServerHttpResponse(
		HttpServerExchange exchange, DataBufferFactory bufferFactory, UndertowServerHttpRequest request) {

	super(bufferFactory, createHeaders(exchange));
	Assert.notNull(exchange, "HttpServerExchange must not be null");
	this.exchange = exchange;
	this.request = request;
}
 
源代码25 项目: spring-cloud-rsocket   文件: RouteSetup.java
@Override
public DataBuffer encodeValue(RouteSetup value, DataBufferFactory bufferFactory,
		ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
	NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
	ByteBuf encoded = RouteSetup.encode(factory.getByteBufAllocator(), value);
	return factory.wrap(encoded);
}
 
/**
 * Returns a new {@link DataBufferFactoryWrapper} for {@link ArmeriaWebServer} and
 * {@link ArmeriaClientHttpConnector}.
 */
@Bean
@ConditionalOnMissingBean(DataBufferFactoryWrapper.class)
public DataBufferFactoryWrapper<?> armeriaBufferFactory(
        Optional<DataBufferFactory> dataBufferFactory) {
    if (dataBufferFactory.isPresent()) {
        return new DataBufferFactoryWrapper<>(dataBufferFactory.get());
    }
    return DataBufferFactoryWrapper.DEFAULT;
}
 
源代码27 项目: staccato   文件: DataBufferWriter.java
public <T> Mono<Void> write(ServerHttpResponse httpResponse, T object) {
    DataBufferFactory bufferFactory = httpResponse.bufferFactory();
    return httpResponse
            .writeWith(Mono.fromSupplier(() -> {
                try {
                    return bufferFactory.wrap(objectMapper.writeValueAsBytes(object));
                } catch (Exception ex) {
                    log.warn("Error writing response", ex);
                    return bufferFactory.wrap(new byte[0]);
                }
            }));
}
 
源代码28 项目: spring-analysis-note   文件: ProtobufEncoder.java
@Override
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
		ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	return Flux.from(inputStream).map(message ->
			encodeValue(message, bufferFactory, !(inputStream instanceof Mono)));
}
 
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
	super(dataBufferFactory);
	this.writeHandler = body -> {
		this.body = body.cache();
		return this.body.then();
	};
}
 
源代码30 项目: spring-analysis-note   文件: Jackson2CborEncoder.java
@Override
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
	throw new UnsupportedOperationException("Does not support stream encoding yet");
}
 
 同包方法