下面列出了怎么用io.netty.util.ReferenceCounted的API类实例代码及写法,或者点击链接到github查看源代码。
@ParameterizedTest
@MethodSource("interactions")
void expiredLeaseRequestsAreRejected(BiFunction<RSocket, Payload, Publisher<?>> interaction) {
leaseSender.onNext(Lease.create(50, 1));
ByteBuf buffer = byteBufAllocator.buffer();
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
Payload payload1 = ByteBufPayload.create(buffer);
Flux.from(interaction.apply(rSocketRequester, payload1))
.delaySubscription(Duration.ofMillis(100))
.as(StepVerifier::create)
.expectError(MissingLeaseException.class)
.verify(Duration.ofSeconds(5));
Assertions.assertThat(connection.getSent())
.hasSize(1)
.first()
.matches(bb -> FrameHeaderCodec.frameType(bb) == LEASE)
.matches(ReferenceCounted::release);
byteBufAllocator.assertHasNoLeaks();
}
/**
* Execute a prepared query. Query execution terminates with the last {@link CompleteMessage} or
* a {@link ErrorMessage}. The {@link ErrorMessage} will emit an exception.
*
* @param client the {@link Client} to exchange messages with.
* @param context the connection context, for cursor status.
* @param sql the original statement for exception tracing.
* @param identifier the statement identifier want to execute.
* @param deprecateEof EOF has been deprecated.
* @param binding the data of binding.
* @param fetchSize the size of fetching, if it less than or equal to {@literal 0} means fetch all rows.
* @return the messages received in response to this exchange, and will be completed by {@link CompleteMessage} when it is the last.
*/
private static Flux<ServerMessage> execute0(
Client client, ConnectionContext context, String sql, PreparedIdentifier identifier, boolean deprecateEof, Binding binding, int fetchSize
) {
if (fetchSize > 0) {
int statementId = identifier.getId();
ExchangeableMessage cursor = binding.toExecuteMessage(statementId, false);
// If EOF has been deprecated, it will end by OK message (same as fetch), otherwise it will end by Metadata EOF message.
// So do not take the last response message (i.e. OK message) for execute if EOF has been deprecated.
return OperatorUtils.discardOnCancel(client.exchange(cursor, deprecateEof ? FETCH_DONE : METADATA_DONE))
.doOnDiscard(ReferenceCounted.class, RELEASE)
.handle(new TakeOne(sql)) // Should wait to complete, then concat fetches.
.concatWith(Flux.defer(() -> fetch(client, context, identifier, new PreparedFetchMessage(statementId, fetchSize), sql)));
} else {
return OperatorUtils.discardOnCancel(client.exchange(binding.toExecuteMessage(identifier.getId(), true), FETCH_DONE))
.doOnDiscard(ReferenceCounted.class, RELEASE)
.handle(new Handler(sql));
}
}
@Test
void allRelease() {
List<MockRow> rows = IntStream.range(0, ROWS)
.mapToObj(MockRow::new)
.collect(Collectors.toList());
Flux.fromIterable(rows)
.as(OperatorUtils::discardOnCancel)
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release)
.<Integer>handle((it, sink) -> {
try {
sink.next(it.id);
} finally {
it.release();
}
})
.as(it -> StepVerifier.create(it, 0))
.thenRequest(2)
.expectNext(0, 1)
.thenCancel()
.verify();
assertThat(rows).hasSize(ROWS).extracting(MockRow::refCnt).containsOnly(0);
}
@Test
public void testNonByteBufWriteIsReleased() throws Exception {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
AbstractReferenceCounted referenceCounted = new AbstractReferenceCounted() {
@Override
public ReferenceCounted touch(Object hint) {
return this;
}
@Override
protected void deallocate() {
}
};
try {
ch.write(referenceCounted).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), is(instanceOf(UnsupportedMessageTypeException.class)));
}
assertEquals(0, referenceCounted.refCnt());
assertTrue(ch.finishAndReleaseAll());
}
/**
* This test makes sure that even when more requests arrive in the same batch, they
* get emitted as separate messages.
*/
@Test
public void shouldHandleTwoMessagesInOneBatch() {
channel.writeInbound(Unpooled.buffer().writeBytes(GET_REQUEST).writeBytes(GET_REQUEST));
BinaryMemcacheRequest request = channel.readInbound();
assertThat(request, instanceOf(BinaryMemcacheRequest.class));
assertThat(request, notNullValue());
request.release();
Object lastContent = channel.readInbound();
assertThat(lastContent, instanceOf(LastMemcacheContent.class));
((ReferenceCounted) lastContent).release();
request = channel.readInbound();
assertThat(request, instanceOf(BinaryMemcacheRequest.class));
assertThat(request, notNullValue());
request.release();
lastContent = channel.readInbound();
assertThat(lastContent, instanceOf(LastMemcacheContent.class));
((ReferenceCounted) lastContent).release();
}
@Test
@Timeout(2_000)
public void testHandleApplicationException() {
rule.connection.clearSendReceiveBuffers();
Publisher<Payload> response = rule.socket.requestResponse(EmptyPayload.INSTANCE);
Subscriber<Payload> responseSub = TestSubscriber.create();
response.subscribe(responseSub);
int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE);
rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(rule.alloc(), streamId, new ApplicationErrorException("error")));
verify(responseSub).onError(any(ApplicationErrorException.class));
Assertions.assertThat(rule.connection.getSent())
// requestResponseFrame
.hasSize(1)
.allMatch(ReferenceCounted::release);
rule.assertHasNoLeaks();
}
void channelRead(T data) {
assertInEventloop();
if (data instanceof ReferenceCounted) {
channelReadReferenceCounted((ReferenceCounted) data);
return;
}
if (fatalError != null) {
return;
}
if (subscription == null || shouldBuffer()) {
addPending(data);
if (subscription != null) {
processPending(subscription);
}
} else {
emit(subscription, data);
}
}
@Test
void allRelease() {
List<MockRow> rows = IntStream.range(0, ROWS)
.mapToObj(MockRow::new)
.collect(Collectors.toList());
Flux.fromIterable(rows)
.as(OperatorUtils::discardOnCancel)
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release)
.<Integer>handle((it, sink) -> {
try {
sink.next(it.id);
} finally {
it.release();
}
})
.as(it -> StepVerifier.create(it, 0))
.thenRequest(2)
.expectNext(0, 1)
.thenCancel()
.verify();
assertThat(rows).hasSize(ROWS).extracting(MockRow::refCnt).containsOnly(0);
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out) {
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).retain();
}
if (context.ip != null) {
out.add(msg);
return;
}
HttpHeaders headers = msg.headers();
String realIP = null;
if (headers.contains("X-Forwarded-For")) {
realIP = headers.get("X-Forwarded-For");
}
if (headers.contains("X-Real-IP")) {
realIP = headers.get("X-Real-IP");
}
if (realIP != null) {
if (LogHelper.isDevEnabled()) {
LogHelper.dev("Real IP address %s", realIP);
}
context.ip = realIP;
} else LogHelper.error("IpForwarding error. Headers not found");
out.add(msg);
}
@Test
public void testChannelRequestServerSideCancellation() {
MonoProcessor<Payload> cancelled = MonoProcessor.create();
UnicastProcessor<Payload> request = UnicastProcessor.create();
request.onNext(EmptyPayload.INSTANCE);
rule.socket.requestChannel(request).subscribe(cancelled);
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
rule.connection.addToReceivedBuffer(CancelFrameCodec.encode(rule.alloc(), streamId));
rule.connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(rule.alloc(), streamId));
Flux.first(
cancelled,
Flux.error(new IllegalStateException("Channel request not cancelled"))
.delaySubscription(Duration.ofSeconds(1)))
.blockFirst();
Assertions.assertThat(request.isDisposed()).isTrue();
Assertions.assertThat(rule.connection.getSent())
.hasSize(1)
.first()
.matches(bb -> frameType(bb) == REQUEST_CHANNEL)
.matches(ReferenceCounted::release);
rule.assertHasNoLeaks();
}
@ParameterizedTest
@MethodSource("flags")
public void shouldEncodeEmptyTrace(TracingMetadataCodec.Flags expectedFlag) {
LeaksTrackingByteBufAllocator allocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
ByteBuf byteBuf = TracingMetadataCodec.encodeEmpty(allocator, expectedFlag);
TracingMetadata tracingMetadata = TracingMetadataCodec.decode(byteBuf);
Assertions.assertThat(tracingMetadata)
.matches(TracingMetadata::isEmpty)
.matches(
tm -> {
switch (expectedFlag) {
case UNDECIDED:
return !tm.isDecided();
case NOT_SAMPLE:
return tm.isDecided() && !tm.isSampled();
case SAMPLE:
return tm.isDecided() && tm.isSampled();
case DEBUG:
return tm.isDecided() && tm.isDebug();
}
return false;
});
Assertions.assertThat(byteBuf).matches(ReferenceCounted::release);
allocator.assertHasNoLeaks();
}
@After
public void tearDown() throws GeneralSecurityException {
for (ReferenceCounted reference : references) {
reference.release();
}
references.clear();
client.destroy();
server.destroy();
// Increase our chances to detect ByteBuf leaks.
GcFinalization.awaitFullGc();
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
if (msg instanceof IPacket) {
IPacket packet = (IPacket) msg;
if (log.isDebugEnabled())
log.debug("Sending packet: {} to channel: {}", msg, ctx.channel());
ByteBuf encodedPacket = encodePacket(packet);
if (log.isDebugEnabled())
log.debug("Encoded packet: {}", encodedPacket);
TransportType transportType = packet.getTransportType();
if (transportType == TransportType.WEBSOCKET || transportType == TransportType.FLASHSOCKET) {
out.add(new TextWebSocketFrame(encodedPacket));
} else if (transportType == TransportType.XHR_POLLING) {
out.add(PipelineUtils.createHttpResponse(packet.getOrigin(), encodedPacket, false));
} else if (transportType == TransportType.JSONP_POLLING) {
String jsonpIndexParam = (packet.getJsonpIndexParam() != null) ? packet.getJsonpIndexParam() : "0";
String encodedStringPacket = encodedPacket.toString(CharsetUtil.UTF_8);
encodedPacket.release();
String encodedJsonpPacket = String.format(JSONP_TEMPLATE, jsonpIndexParam, encodedStringPacket);
HttpResponse httpResponse = PipelineUtils.createHttpResponse(packet.getOrigin(), PipelineUtils.copiedBuffer(ctx.alloc(), encodedJsonpPacket), true);
httpResponse.headers().add("X-XSS-Protection", "0");
out.add(httpResponse);
} else {
throw new UnsupportedTransportTypeException(transportType);
}
} else {
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).retain();
}
out.add(msg);
}
}
/**
* This method is called by users of the ProxyConnection to send stuff out
* over the socket.
*/
void write(Object msg) {
if (msg instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) msg).retain();
}
doWrite(msg);
}
/**
* <p>
* Do all the stuff that needs to be done after our {@link ConnectionFlow}
* has succeeded.
* </p>
*
* @param shouldForwardInitialRequest
* whether or not we should forward the initial HttpRequest to
* the server after the connection has been established.
*/
void connectionSucceeded(boolean shouldForwardInitialRequest) {
become(AWAITING_INITIAL);
if (this.chainedProxy != null) {
// Notify the ChainedProxy that we successfully connected
try {
this.chainedProxy.connectionSucceeded();
} catch (Exception e) {
LOG.error("Unable to record connectionSucceeded", e);
}
}
clientConnection.serverConnectionSucceeded(this,
shouldForwardInitialRequest);
if (shouldForwardInitialRequest) {
LOG.debug("Writing initial request: {}", initialRequest);
write(initialRequest);
} else {
LOG.debug("Dropping initial request: {}", initialRequest);
}
// we're now done with the initialRequest: it's either been forwarded to the upstream server (HTTP requests), or
// completely dropped (HTTPS CONNECTs). if the initialRequest is reference counted (typically because the HttpObjectAggregator is in
// the pipeline to generate FullHttpRequests), we need to manually release it to avoid a memory leak.
if (initialRequest instanceof ReferenceCounted) {
((ReferenceCounted)initialRequest).release();
}
}
@Override
public ReferenceCounted touch(Object hint) {
if (this.buffers.isEmpty()) {
return this;
}
for (ByteBuf buf : this.buffers) {
buf.touch(hint);
}
return this;
}
/**
* This is the method that executing writing to channel.
* It will be used both write0 and {@link com.linkedin.mitm.proxy.connectionflow.steps.ConnectionFlowStep}
*
* @param channel which channel to write to
* @param object which object to write to.
*
* */
private ChannelFuture writeToChannel(final Channel channel, final Object object) {
if (channel == null) {
throw new IllegalStateException("Failed to write to channel because channel is null");
}
if (object instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) object).retain();
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Writing in channel [%s]: %s", channel.toString(), object));
}
return channel.writeAndFlush(object);
}
private static Flux<ServerMessage> execute0(Client client, TextQuery query, Binding binding) {
ProcessableHandler handler = new ProcessableHandler();
TextQueryMessage message = binding.toTextMessage(query, handler);
return OperatorUtils.discardOnCancel(client.exchange(message, FETCH_DONE))
.doOnDiscard(ReferenceCounted.class, RELEASE)
.handle(handler);
}
void release(T t) {
if (t instanceof ReferenceCounted) {
ReferenceCounted refCounted = (ReferenceCounted) t;
if (refCounted.refCnt() > 0) {
try {
refCounted.release();
} catch (Throwable ex) {
// no ops
}
}
}
}
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
try {
if (msg instanceof ReferenceCounted) {
((ReferenceCounted) msg).release();
promise.setSuccess();
} else {
channel().writeAndFlush(msg, promise);
}
} catch (Exception e) {
promise.setFailure(e);
handleException(e);
}
return promise;
}
private void clear(int section) {
final Object recordOrList = sectionAt(section);
setSection(section, null);
if (recordOrList instanceof ReferenceCounted) {
((ReferenceCounted) recordOrList).release();
} else if (recordOrList instanceof List) {
@SuppressWarnings("unchecked")
List<DnsRecord> list = (List<DnsRecord>) recordOrList;
if (!list.isEmpty()) {
for (Object r : list) {
ReferenceCountUtil.release(r);
}
}
}
}
@Test
public void unknownFrameTypeShouldThrowAndBeReleased() throws Exception {
class UnknownHttp2Frame extends AbstractReferenceCounted implements Http2Frame {
@Override
public String name() {
return "UNKNOWN";
}
@Override
protected void deallocate() {
}
@Override
public ReferenceCounted touch(Object hint) {
return this;
}
}
UnknownHttp2Frame frame = new UnknownHttp2Frame();
assertEquals(1, frame.refCnt());
ChannelFuture f = channel.write(frame);
f.await();
assertTrue(f.isDone());
assertFalse(f.isSuccess());
assertThat(f.cause(), instanceOf(UnsupportedMessageTypeException.class));
assertEquals(0, frame.refCnt());
}
@Test
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestResponseTest1() {
Scheduler parallel = Schedulers.parallel();
Hooks.onErrorDropped((e) -> {});
ByteBufAllocator allocator = rule.alloc();
for (int i = 0; i < 10000; i++) {
Operators.MonoSubscriber<Payload, Payload>[] sources = new Operators.MonoSubscriber[1];
rule.setAcceptingSocket(
new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return new Mono<Payload>() {
@Override
public void subscribe(CoreSubscriber<? super Payload> actual) {
sources[0] = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sources[0]);
}
};
}
},
Integer.MAX_VALUE);
rule.sendRequest(1, REQUEST_RESPONSE);
ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, 1);
RaceTestUtils.race(
() -> rule.connection.addToReceivedBuffer(cancelFrame),
() -> {
sources[0].complete(ByteBufPayload.create("d1", "m1"));
},
parallel);
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
rule.assertHasNoLeaks();
}
}
private ReferenceCounted retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}
@Test
public void testReleaseSslEngine() throws Exception {
assumeTrue(OpenSsl.isAvailable());
SelfSignedCertificate cert = new SelfSignedCertificate();
try {
SslContext sslContext = SslContextBuilder.forServer(cert.certificate(), cert.privateKey())
.sslProvider(SslProvider.OPENSSL)
.build();
try {
SSLEngine sslEngine = sslContext.newEngine(ByteBufAllocator.DEFAULT);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(sslEngine));
assertEquals(1, ((ReferenceCounted) sslContext).refCnt());
assertEquals(1, ((ReferenceCounted) sslEngine).refCnt());
assertTrue(ch.finishAndReleaseAll());
ch.close().syncUninterruptibly();
assertEquals(1, ((ReferenceCounted) sslContext).refCnt());
assertEquals(0, ((ReferenceCounted) sslEngine).refCnt());
} finally {
ReferenceCountUtil.release(sslContext);
}
} finally {
cert.delete();
}
}
@Override
public int refCnt() {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).refCnt();
} else {
return 1;
}
}
@Test
// see https://github.com/rsocket/rsocket-java/issues/858
public void testWorkaround858() {
ByteBuf buffer = rule.alloc().buffer();
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
TestPublisher<Payload> testPublisher = TestPublisher.create();
rule.setAcceptingSocket(
new RSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads).doOnNext(ReferenceCounted::release).subscribe();
return testPublisher.flux();
}
});
rule.connection.addToReceivedBuffer(
RequestChannelFrameCodec.encodeReleasingPayload(
rule.alloc(), 1, false, 1, ByteBufPayload.create(buffer)));
rule.connection.addToReceivedBuffer(
ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));
Assertions.assertThat(rule.connection.getSent())
.hasSize(1)
.first()
.matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_N)
.matches(ReferenceCounted::release);
Assertions.assertThat(rule.socket.isDisposed()).isFalse();
testPublisher.assertWasCancelled();
rule.assertHasNoLeaks();
}
private void channelReadReferenceCounted(ReferenceCounted data) {
try {
data.release();
} finally {
// We do not expect ref-counted objects here as ST does not support them and do not take care to clean them
// in error conditions. Hence we fail-fast when we see such objects.
pending = null;
if (fatalError == null) {
fatalError = new IllegalArgumentException("Reference counted leaked netty's pipeline. Object: " +
data.getClass().getSimpleName());
exceptionCaught0(fatalError);
}
channel.close();
}
}
@Override
public void complete() throws CompressionException {
if (broken) throw new CompressionException("Compressed stream is broken");
if (disposed) throw new DisposedDecompressorException();
disposed = true;
boolean unreturnedFrames = !decodedSnappyFrames.isEmpty();
decodedSnappyFrames.forEach(ReferenceCounted::release);
decodedSnappyFrames.clear();
snappyFrameDecoder.complete();
if (unreturnedFrames) {
throw new PayloadSmallerThanExpectedException("Unread uncompressed frames on complete");
}
}
@After
public void teardown() {
for (ReferenceCounted reference : references) {
reference.release();
}
references.clear();
// Increase our chances to detect ByteBuf leaks.
GcFinalization.awaitFullGc();
}