下面列出了io.netty.buffer.ByteBufAllocatorMetric#io.netty.buffer.PooledByteBufAllocatorMetric 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void verifyAllocations() {
if (this.bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) this.bufferFactory).getByteBufAllocator();
if (allocator instanceof PooledByteBufAllocator) {
Instant start = Instant.now();
while (true) {
PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) allocator).metric();
long total = getAllocations(metric.directArenas()) + getAllocations(metric.heapArenas());
if (total == 0) {
return;
}
if (Instant.now().isBefore(start.plus(Duration.ofSeconds(5)))) {
try {
Thread.sleep(50);
}
catch (InterruptedException ex) {
// ignore
}
continue;
}
assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
}
}
}
}
@Override
public void run() {
PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
numDirectArenas = metric.numDirectArenas();
numHeapArenas = metric.numHeapArenas();
numThreadLocalCaches = metric.numThreadLocalCaches();
usedHeapMemory = metric.usedHeapMemory();
usedDirectMemory = metric.usedDirectMemory();
List<PoolArenaMetric> heapArenaMetrics = metric.heapArenas();
List<PoolArenaMetric> directArenaMetrics = metric.directArenas();
numHeapTotalAllocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
numHeapTotalDeallocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
numHeapTotalActiveAllocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
numDirectTotalAllocations = directArenaMetrics.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
numDirectTotalDeallocations = directArenaMetrics.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
numDirectTotalActiveAllocations =
directArenaMetrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
}
/**
* Method to call at any method that is tagged {@link org.junit.Before} to collect some allocation stats.
*/
public void beforeTest() {
if (cachedEnabled) {
return;
}
PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
List<PoolArenaMetric> heaps = metric.heapArenas();
activeHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
heapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
heapDeallocations = heaps.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
List<PoolArenaMetric> directs = metric.directArenas();
activeDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
directAllocations = directs.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
directDeallocations = directs.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
}
/**
* Method to call at any method that is tagged {@link org.junit.After} to verify there is no leak within this test case.
*/
public void afterTest() {
if (cachedEnabled || disabled) {
return;
}
PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
List<PoolArenaMetric> heaps = metric.heapArenas();
long currentActiveHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
long currentHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
long currentHeapDeallocations = heaps.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
List<PoolArenaMetric> directs = metric.directArenas();
long currentActiveDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
long currentDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
long currentDirectDeallocations = directs.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
String message = String.format("DirectMemoryLeak: [allocation|deallocation] before test[%d|%d], after test[%d|%d]",
directAllocations, directDeallocations, currentDirectAllocations, currentDirectDeallocations);
Assert.assertEquals(message, activeDirectAllocations, currentActiveDirectAllocations);
message = String.format("HeapMemoryLeak: [allocation|deallocation] before test[%d|%d], after test[%d|%d]",
heapAllocations, heapDeallocations, currentHeapAllocations, currentHeapDeallocations);
Assert.assertEquals(message, activeHeapAllocations, currentActiveHeapAllocations);
}
private void verifyAllocations() {
if (this.bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) this.bufferFactory).getByteBufAllocator();
if (allocator instanceof PooledByteBufAllocator) {
PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) allocator).metric();
long total = getAllocations(metric.directArenas()) + getAllocations(metric.heapArenas());
assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
}
}
}
void registerMetrics(String allocType, ByteBufAllocatorMetric metrics) {
cache.computeIfAbsent(metrics.hashCode() + "", key -> {
String[] tags = new String[] {ID, key, TYPE, allocType};
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + USED_HEAP_MEMORY, metrics, ByteBufAllocatorMetric::usedHeapMemory)
.description("The number of the bytes of the heap memory.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + USED_DIRECT_MEMORY, metrics, ByteBufAllocatorMetric::usedDirectMemory)
.description("The number of the bytes of the direct memory.")
.tags(tags)
.register(REGISTRY);
if (metrics instanceof PooledByteBufAllocatorMetric) {
PooledByteBufAllocatorMetric pooledMetrics = (PooledByteBufAllocatorMetric) metrics;
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + HEAP_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numHeapArenas)
.description("The number of heap arenas.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + DIRECT_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numDirectArenas)
.description("The number of direct arenas.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + THREAD_LOCAL_CACHES, pooledMetrics, PooledByteBufAllocatorMetric::numThreadLocalCaches)
.description("The number of thread local caches.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + TINY_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::tinyCacheSize)
.description("The size of the tiny cache.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + SMALL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::smallCacheSize)
.description("The size of the small cache.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + NORMAL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::normalCacheSize)
.description("The size of the normal cache.")
.tags(tags)
.register(REGISTRY);
Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + CHUNK_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::chunkSize)
.description("The chunk size for an arena.")
.tags(tags)
.register(REGISTRY);
}
return metrics;
});
}