io.netty.buffer.ByteBufAllocatorMetric#io.netty.buffer.PooledByteBufAllocatorMetric源码实例Demo

下面列出了io.netty.buffer.ByteBufAllocatorMetric#io.netty.buffer.PooledByteBufAllocatorMetric 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private void verifyAllocations() {
	if (this.bufferFactory instanceof NettyDataBufferFactory) {
		ByteBufAllocator allocator = ((NettyDataBufferFactory) this.bufferFactory).getByteBufAllocator();
		if (allocator instanceof PooledByteBufAllocator) {
			Instant start = Instant.now();
			while (true) {
				PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) allocator).metric();
				long total = getAllocations(metric.directArenas()) + getAllocations(metric.heapArenas());
				if (total == 0) {
					return;
				}
				if (Instant.now().isBefore(start.plus(Duration.ofSeconds(5)))) {
					try {
						Thread.sleep(50);
					}
					catch (InterruptedException ex) {
						// ignore
					}
					continue;
				}
				assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
			}
		}
	}
}
 
源代码2 项目: ambry   文件: NettyInternalMetrics.java
@Override
public void run() {
  PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
  numDirectArenas = metric.numDirectArenas();
  numHeapArenas = metric.numHeapArenas();
  numThreadLocalCaches = metric.numThreadLocalCaches();
  usedHeapMemory = metric.usedHeapMemory();
  usedDirectMemory = metric.usedDirectMemory();
  List<PoolArenaMetric> heapArenaMetrics = metric.heapArenas();
  List<PoolArenaMetric> directArenaMetrics = metric.directArenas();
  numHeapTotalAllocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  numHeapTotalDeallocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
  numHeapTotalActiveAllocations = heapArenaMetrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
  numDirectTotalAllocations = directArenaMetrics.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  numDirectTotalDeallocations = directArenaMetrics.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
  numDirectTotalActiveAllocations =
      directArenaMetrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
}
 
源代码3 项目: ambry   文件: NettyByteBufLeakHelper.java
/**
 * Method to call at any method that is tagged {@link org.junit.Before} to collect some allocation stats.
 */
public void beforeTest() {
  if (cachedEnabled) {
    return;
  }
  PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
  List<PoolArenaMetric> heaps = metric.heapArenas();
  activeHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
  heapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  heapDeallocations = heaps.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();

  List<PoolArenaMetric> directs = metric.directArenas();
  activeDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
  directAllocations = directs.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  directDeallocations = directs.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();
}
 
源代码4 项目: ambry   文件: NettyByteBufLeakHelper.java
/**
 * Method to call at any method that is tagged {@link org.junit.After} to verify there is no leak within this test case.
 */
public void afterTest() {
  if (cachedEnabled || disabled) {
    return;
  }

  PooledByteBufAllocatorMetric metric = PooledByteBufAllocator.DEFAULT.metric();
  List<PoolArenaMetric> heaps = metric.heapArenas();
  long currentActiveHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
  long currentHeapAllocations = heaps.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  long currentHeapDeallocations = heaps.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();

  List<PoolArenaMetric> directs = metric.directArenas();
  long currentActiveDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
  long currentDirectAllocations = directs.stream().mapToLong(PoolArenaMetric::numAllocations).sum();
  long currentDirectDeallocations = directs.stream().mapToLong(PoolArenaMetric::numDeallocations).sum();

  String message = String.format("DirectMemoryLeak: [allocation|deallocation] before test[%d|%d], after test[%d|%d]",
      directAllocations, directDeallocations, currentDirectAllocations, currentDirectDeallocations);
  Assert.assertEquals(message, activeDirectAllocations, currentActiveDirectAllocations);
  message = String.format("HeapMemoryLeak: [allocation|deallocation] before test[%d|%d], after test[%d|%d]",
      heapAllocations, heapDeallocations, currentHeapAllocations, currentHeapDeallocations);
  Assert.assertEquals(message, activeHeapAllocations, currentActiveHeapAllocations);
}
 
private void verifyAllocations() {
	if (this.bufferFactory instanceof NettyDataBufferFactory) {
		ByteBufAllocator allocator = ((NettyDataBufferFactory) this.bufferFactory).getByteBufAllocator();
		if (allocator instanceof PooledByteBufAllocator) {
			PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) allocator).metric();
			long total = getAllocations(metric.directArenas()) + getAllocations(metric.heapArenas());
			assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
		}
	}
}
 
源代码6 项目: reactor-netty   文件: ByteBufAllocatorMetrics.java
void registerMetrics(String allocType, ByteBufAllocatorMetric metrics) {
	cache.computeIfAbsent(metrics.hashCode() + "", key -> {
		String[] tags = new String[] {ID, key, TYPE, allocType};

		Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + USED_HEAP_MEMORY, metrics, ByteBufAllocatorMetric::usedHeapMemory)
		     .description("The number of the bytes of the heap memory.")
		     .tags(tags)
		     .register(REGISTRY);

		Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + USED_DIRECT_MEMORY, metrics, ByteBufAllocatorMetric::usedDirectMemory)
		     .description("The number of the bytes of the direct memory.")
		     .tags(tags)
		     .register(REGISTRY);

		if (metrics instanceof PooledByteBufAllocatorMetric) {
			PooledByteBufAllocatorMetric pooledMetrics = (PooledByteBufAllocatorMetric) metrics;

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + HEAP_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numHeapArenas)
			     .description("The number of heap arenas.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + DIRECT_ARENAS, pooledMetrics, PooledByteBufAllocatorMetric::numDirectArenas)
			     .description("The number of direct arenas.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + THREAD_LOCAL_CACHES, pooledMetrics, PooledByteBufAllocatorMetric::numThreadLocalCaches)
			     .description("The number of thread local caches.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + TINY_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::tinyCacheSize)
			     .description("The size of the tiny cache.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + SMALL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::smallCacheSize)
			     .description("The size of the small cache.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + NORMAL_CACHE_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::normalCacheSize)
			     .description("The size of the normal cache.")
			     .tags(tags)
			     .register(REGISTRY);

			Gauge.builder(BYTE_BUF_ALLOCATOR_PREFIX + CHUNK_SIZE, pooledMetrics, PooledByteBufAllocatorMetric::chunkSize)
			     .description("The chunk size for an arena.")
			     .tags(tags)
			     .register(REGISTRY);
		}

		return metrics;
	});
}