下面列出了怎么用org.springframework.core.io.buffer.NettyDataBufferFactory的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Use this method to slice, retain and wrap the data portion of the
* {@code Payload}, and also to release the {@code Payload}. This assumes
* the Payload metadata has been read by now and ensures downstream code
* need only be aware of {@code DataBuffer}s.
* @param payload the payload to process
* @param bufferFactory the DataBufferFactory to wrap with
* @return the created {@code DataBuffer} instance
*/
public static DataBuffer retainDataAndReleasePayload(Payload payload, DataBufferFactory bufferFactory) {
try {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBuf byteBuf = payload.sliceData().retain();
return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
}
else {
return bufferFactory.wrap(payload.getData());
}
}
finally {
if (payload.refCnt() > 0) {
payload.release();
}
}
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(protocols, getMaxFramePayloadLength())
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(
inbound, outbound, info, factory, getMaxFramePayloadLength());
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session).checkpoint(url + " [ReactorNettyWebSocketClient]");
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
});
}
@Test
public void usingNettyDataBufferFactory_PooledHttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT));
final PooledHttpData httpData1 =
PooledHttpData.wrap(Unpooled.wrappedBuffer("abc".getBytes()));
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isOne();
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).content())
.isEqualTo(((NettyDataBuffer) buffer).getNativeBuffer());
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
}
@Test
public void usingNettyDataBufferFactory_HttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT));
final HttpData httpData1 = HttpData.ofUtf8("abc");
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isOne();
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).content())
.isEqualTo(((NettyDataBuffer) buffer).getNativeBuffer());
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
}
/**
* Constructor with an additional maxFramePayloadLength argument.
* @since 5.1
*/
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory,
int maxFramePayloadLength) {
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
this.maxFramePayloadLength = maxFramePayloadLength;
}
@Test // SPR-17054
public void unsupportedMediaTypeShouldConsumeAndCancel() {
NettyDataBufferFactory factory = new NettyDataBufferFactory(new PooledByteBufAllocator(true));
NettyDataBuffer buffer = factory.wrap(ByteBuffer.wrap("spring".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
response.setBody(body.flux());
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(buffer);
})
.expectErrorSatisfies(throwable -> {
assertTrue(throwable instanceof UnsupportedMediaTypeException);
try {
buffer.release();
Assert.fail("releasing the buffer should have failed");
}
catch (IllegalReferenceCountException exc) {
}
body.assertCancelled();
}).verify();
}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
}
else {
return new ReactorClientHttpConnector();
}
}
public ReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
this.httpMethod = method;
this.uri = uri;
this.request = request;
this.outbound = outbound;
this.bufferFactory = new NettyDataBufferFactory(outbound.alloc());
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
// Send as Mono if possible as an optimization hint to Reactor Netty
if (body instanceof Mono) {
Mono<ByteBuf> byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufMono).then();
}
else {
Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufFlux).then();
}
});
}
public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory)
throws URISyntaxException {
super(initUri(request), "", initHeaders(request));
Assert.notNull(bufferFactory, "DataBufferFactory must not be null");
this.request = request;
this.bufferFactory = bufferFactory;
}
public DataBuffer encodeValue(ByteBuf byteBuf, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
DataBuffer dataBuffer = ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
if (this.logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
String logPrefix = Hints.getLogPrefix(hints);
this.logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
}
return dataBuffer;
}
public DataBuffer encodeValue(ByteBuf byteBuf, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
DataBuffer dataBuffer = ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
if (this.logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
String logPrefix = Hints.getLogPrefix(hints);
this.logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
}
return dataBuffer;
}
/**
* Constructor with an additional maxFramePayloadLength argument.
* @since 5.1
*/
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory,
int maxFramePayloadLength) {
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
this.maxFramePayloadLength = maxFramePayloadLength;
}
@Test // SPR-17054
public void unsupportedMediaTypeShouldConsumeAndCancel() {
NettyDataBufferFactory factory = new NettyDataBufferFactory(new PooledByteBufAllocator(true));
NettyDataBuffer buffer = factory.wrap(ByteBuffer.wrap("spring".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
response.setBody(body.flux());
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(buffer);
})
.expectErrorSatisfies(throwable -> {
assertTrue(throwable instanceof UnsupportedMediaTypeException);
try {
buffer.release();
Assert.fail("releasing the buffer should have failed");
} catch (IllegalReferenceCountException exc) {
}
body.assertCancelled();
}).verify();
}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
}
else {
return new ReactorClientHttpConnector();
}
}
public ReactorClientHttpRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound outbound) {
this.httpMethod = method;
this.uri = uri;
this.request = request;
this.outbound = outbound;
this.bufferFactory = new NettyDataBufferFactory(outbound.alloc());
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufFlux).then();
});
}
public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory)
throws URISyntaxException {
super(initUri(request), "", initHeaders(request));
Assert.notNull(bufferFactory, "DataBufferFactory must not be null");
this.request = request;
this.bufferFactory = bufferFactory;
}
@Override
public DataBuffer encodeValue(Forwarding value, DataBufferFactory bufferFactory,
ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
ByteBuf encoded = Forwarding.encode(factory.getByteBufAllocator(), value);
return factory.wrap(encoded);
}
@Override
public DataBuffer encodeValue(RouteSetup value, DataBufferFactory bufferFactory,
ResolvableType valueType, MimeType mimeType, Map<String, Object> hints) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
ByteBuf encoded = RouteSetup.encode(factory.getByteBufAllocator(), value);
return factory.wrap(encoded);
}
public MetadataEncoder(MimeType metadataMimeType, RSocketStrategies strategies) {
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
Assert.notNull(strategies, "RSocketStrategies is required");
this.metadataMimeType = metadataMimeType;
this.strategies = strategies;
this.isComposite = this.metadataMimeType.toString()
.equals(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
this.allocator = bufferFactory() instanceof NettyDataBufferFactory
? ((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator()
: ByteBufAllocator.DEFAULT;
}
private DataBuffer asDataBuffer(ByteBuf byteBuf) {
if (bufferFactory() instanceof NettyDataBufferFactory) {
return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf);
}
else {
DataBuffer buffer = bufferFactory().wrap(byteBuf.nioBuffer());
byteBuf.release();
return buffer;
}
}
private ConnectionSetupPayload getConnectionSetupPayload() {
DataBufferFactory dataBufferFactory = messageHandler.getRSocketStrategies()
.dataBufferFactory();
NettyDataBufferFactory ndbf = (NettyDataBufferFactory) dataBufferFactory;
ByteBufAllocator byteBufAllocator = ndbf.getByteBufAllocator();
Payload setupPayload = DefaultPayload.create(Unpooled.EMPTY_BUFFER,
Unpooled.EMPTY_BUFFER);
ByteBuf setup = SetupFrameFlyweight.encode(byteBufAllocator, false, 1, 1,
MESSAGE_RSOCKET_COMPOSITE_METADATA.getString(),
// TODO: configurable?
APPLICATION_CBOR.getString(), setupPayload);
return ConnectionSetupPayload.create(setup);
}