下面列出了怎么用org.springframework.core.io.buffer.NettyDataBuffer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create a Payload from the given metadata and data.
* @param metadata the metadata part for the payload
* @param data the data part for the payload
* @return the created Payload
*/
public static Payload createPayload(DataBuffer metadata, DataBuffer data) {
if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) {
return ByteBufPayload.create(
((NettyDataBuffer) data).getNativeBuffer(),
((NettyDataBuffer) metadata).getNativeBuffer());
}
else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) {
return DefaultPayload.create(
((DefaultDataBuffer) data).getNativeBuffer(),
((DefaultDataBuffer) metadata).getNativeBuffer());
}
else {
return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer());
}
}
@Test
public void usingNettyDataBufferFactory_PooledHttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT));
final PooledHttpData httpData1 =
PooledHttpData.wrap(Unpooled.wrappedBuffer("abc".getBytes()));
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isOne();
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).content())
.isEqualTo(((NettyDataBuffer) buffer).getNativeBuffer());
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
}
@Test
public void usingNettyDataBufferFactory_HttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT));
final HttpData httpData1 = HttpData.ofUtf8("abc");
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isOne();
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).content())
.isEqualTo(((NettyDataBuffer) buffer).getNativeBuffer());
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
}
/**
* Create a Payload from the given data.
* @param data the data part for the payload
* @return the created Payload
*/
public static Payload createPayload(DataBuffer data) {
if (data instanceof NettyDataBuffer) {
return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer());
}
else if (data instanceof DefaultDataBuffer) {
return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer());
}
else {
return DefaultPayload.create(data.asByteBuffer());
}
}
@Override
public NettyDataBuffer wrap(ByteBuf byteBuf) {
NettyDataBuffer dataBuffer = super.wrap(byteBuf);
if (byteBuf != Unpooled.EMPTY_BUFFER) {
recordHint(dataBuffer);
}
return dataBuffer;
}
private DataBuffer recordHint(DataBuffer buffer) {
AssertionError error = new AssertionError(String.format(
"DataBuffer leak: {%s} {%s} not released.%nStacktrace at buffer creation: ", buffer,
ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer())));
this.created.add(new DataBufferLeakInfo(buffer, error));
return buffer;
}
@Test // SPR-17054
public void unsupportedMediaTypeShouldConsumeAndCancel() {
NettyDataBufferFactory factory = new NettyDataBufferFactory(new PooledByteBufAllocator(true));
NettyDataBuffer buffer = factory.wrap(ByteBuffer.wrap("spring".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
response.setBody(body.flux());
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(buffer);
})
.expectErrorSatisfies(throwable -> {
assertTrue(throwable instanceof UnsupportedMediaTypeException);
try {
buffer.release();
Assert.fail("releasing the buffer should have failed");
}
catch (IllegalReferenceCountException exc) {
}
body.assertCancelled();
}).verify();
}
@Override
public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (dataBuffer instanceof NettyDataBuffer) {
return ((NettyDataBuffer) dataBuffer).getNativeBuffer();
}
return Unpooled.wrappedBuffer(dataBuffer.asByteBuffer());
}
@Override
public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (dataBuffer instanceof NettyDataBuffer) {
return ((NettyDataBuffer) dataBuffer).getNativeBuffer();
}
return Unpooled.wrappedBuffer(dataBuffer.asByteBuffer());
}
@Test // SPR-17054
public void unsupportedMediaTypeShouldConsumeAndCancel() {
NettyDataBufferFactory factory = new NettyDataBufferFactory(new PooledByteBufAllocator(true));
NettyDataBuffer buffer = factory.wrap(ByteBuffer.wrap("spring".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
response.setBody(body.flux());
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
StepVerifier.create(extractor.extract(response, this.context))
.then(() -> {
body.assertWasSubscribed();
body.emit(buffer);
})
.expectErrorSatisfies(throwable -> {
assertTrue(throwable instanceof UnsupportedMediaTypeException);
try {
buffer.release();
Assert.fail("releasing the buffer should have failed");
} catch (IllegalReferenceCountException exc) {
}
body.assertCancelled();
}).verify();
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return Mono.defer(() -> {
Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
final Flux<NettyDataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
return isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body);
})
.then(chain.execute(exchange)
.doOnError(throwable -> cleanup(exchange))).doOnCancel(() -> cleanup(exchange));
}
private static ServerHttpRequest decorate(ServerWebExchange exchange,
DataBuffer dataBuffer, boolean cacheDecoratedRequest) {
if (dataBuffer.readableByteCount() > 0) {
if (log.isTraceEnabled()) {
log.trace("retaining body in exchange attribute");
}
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
}
ServerHttpRequest decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.<DataBuffer>fromSupplier(() -> {
if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR,
null) == null) {
// probably == downstream closed or no body
return null;
}
// TODO: deal with Netty
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory().wrap(pdb.getNativeBuffer().retainedSlice());
}).flux();
}
};
if (cacheDecoratedRequest) {
exchange.getAttributes().put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR,
decorator);
}
return decorator;
}
/**
* Converts a {@link DataBuffer} into an {@link HttpData}.
*/
HttpData toHttpData(DataBuffer dataBuffer) {
if (dataBuffer instanceof NettyDataBuffer) {
return PooledHttpData.wrap((((NettyDataBuffer) dataBuffer).getNativeBuffer()));
}
final ByteBuffer buf =
dataBuffer instanceof DefaultDataBuffer ? ((DefaultDataBuffer) dataBuffer).getNativeBuffer()
: dataBuffer.asByteBuffer();
return PooledHttpData.wrap(Unpooled.wrappedBuffer(buf));
}
@Bean
public DataBufferFactory dataBufferFactory() {
return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT) {
// This method will be called when emitting string from Mono/Flux.
@Override
public NettyDataBuffer allocateBuffer(int initialCapacity) {
final NettyDataBuffer buffer = super.allocateBuffer(initialCapacity);
// Keep allocated buffers.
allocatedBuffers.offer(buffer);
return buffer;
}
};
}
private static void ensureAllBuffersAreReleased() {
await().untilAsserted(() -> {
NettyDataBuffer buffer;
while ((buffer = allocatedBuffers.peek()) != null) {
assertThat(buffer.getNativeBuffer().refCnt()).isZero();
allocatedBuffers.poll();
}
assertThat(allocatedBuffers).isEmpty();
});
}
private int refCount(DataBuffer dataBuffer) {
return dataBuffer instanceof NettyDataBuffer ?
((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() : 1;
}
@Override
public NettyDataBuffer allocateBuffer() {
return (NettyDataBuffer) recordHint(super.allocateBuffer());
}
@Override
public NettyDataBuffer allocateBuffer(int initialCapacity) {
return (NettyDataBuffer) recordHint(super.allocateBuffer(initialCapacity));
}
public static ByteBuf asByteBuf(DataBuffer buffer) {
return buffer instanceof NettyDataBuffer
? ((NettyDataBuffer) buffer).getNativeBuffer()
: Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
static ByteBuf asByteBuf(DataBuffer buffer) {
return buffer instanceof NettyDataBuffer
? ((NettyDataBuffer) buffer).getNativeBuffer()
: Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
@Test
void requestInvalidDemand() throws Exception {
final ConcurrentLinkedQueue<NettyDataBuffer> allocatedBuffers = new ConcurrentLinkedQueue<>();
final DataBufferFactoryWrapper<NettyDataBufferFactory> factoryWrapper = new DataBufferFactoryWrapper<>(
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT) {
@Override
public NettyDataBuffer allocateBuffer() {
final NettyDataBuffer buffer = super.allocateBuffer();
allocatedBuffers.offer(buffer);
return buffer;
}
});
final CompletableFuture<HttpResponse> future = new CompletableFuture<>();
final ArmeriaServerHttpResponse response =
new ArmeriaServerHttpResponse(ctx, future, factoryWrapper, null);
response.writeWith(Mono.just(factoryWrapper.delegate().allocateBuffer().write("foo".getBytes())))
.then(Mono.defer(response::setComplete)).subscribe();
await().until(future::isDone);
assertThat(future.isCompletedExceptionally()).isFalse();
final AtomicBoolean completed = new AtomicBoolean();
final AtomicReference<Throwable> error = new AtomicReference<>();
future.get().subscribe(new Subscriber<HttpObject>() {
@Override
public void onSubscribe(Subscription s) {
s.request(0);
}
@Override
public void onNext(HttpObject httpObject) {
// Do nothing.
}
@Override
public void onError(Throwable t) {
error.compareAndSet(null, t);
completed.set(true);
}
@Override
public void onComplete() {
completed.set(true);
}
});
await().untilTrue(completed);
assertThat(error.get()).isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Reactive Streams specification rule 3.9");
await().untilAsserted(() -> {
assertThat(allocatedBuffers).hasSize(1);
assertThat(allocatedBuffers.peek().getNativeBuffer().refCnt()).isZero();
allocatedBuffers.poll();
});
}