下面列出了com.codahale.metrics.SlidingWindowReservoir#com.codahale.metrics.ExponentiallyDecayingReservoir 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public KafkaUDPConsumer(
Stage.Context context,
UDPConfigBean udpConfigBean,
KafkaTargetConfig kafkaTargetConfig,
BlockingQueue<Exception> errorQueue
) {
this.udpConfigBean = udpConfigBean;
this.kafkaTargetConfig = kafkaTargetConfig;
this.errorQueue = errorQueue;
acceptedPackagesMeter = context.createMeter("acceptedPackages");
discardedPackagesMeter = context.createMeter("discardedPackages");
errorPackagesMeter = context.createMeter("errorPackages");
udpTimer = context.createTimer("udp");
kafkaTimer = context.createTimer("kafka");
kafkaMessagesMeter = context.createMeter("kafkaMessages");
// context does not have a createHistogram(), TODO open JIRA for that
concurrencyHistogram = new Histogram(new ExponentiallyDecayingReservoir());
context
.getMetrics()
.register("custom." + context.getPipelineInfo().get(0).getInstanceName() + ".concurrentPackages.histogram",
concurrencyHistogram);
}
@Override
public List<Stage.ConfigIssue> init(Stage.Context context) {
List<Stage.ConfigIssue> issues = new ArrayList<>();
kafkaTimer = context.createTimer("kafka");
kafkaMessagesMeter = context.createMeter("kafkaMessages");
//TODO: change to use API-66 when API-66 is done.
concurrencyHistogram = new Histogram(new ExponentiallyDecayingReservoir());
context
.getMetrics()
.register("custom." + context.getPipelineInfo().get(0).getInstanceName() + ".concurrentRequests.histogram",
concurrencyHistogram);
try {
kafkaProducerPool = createKafkaProducerPool();
} catch (Exception ex) {
}
return issues;
}
@Test
public void testDefaults() throws Exception {
NodeConfig cfg = loadNodeConfig();
SolrMetricManager mgr = new SolrMetricManager(cfg.getSolrResourceLoader(), cfg.getMetricsConfig());
assertTrue(mgr.getCounterSupplier() instanceof MetricSuppliers.DefaultCounterSupplier);
assertTrue(mgr.getMeterSupplier() instanceof MetricSuppliers.DefaultMeterSupplier);
assertTrue(mgr.getTimerSupplier() instanceof MetricSuppliers.DefaultTimerSupplier);
assertTrue(mgr.getHistogramSupplier() instanceof MetricSuppliers.DefaultHistogramSupplier);
Clock clk = ((MetricSuppliers.DefaultTimerSupplier)mgr.getTimerSupplier()).clk;
assertTrue(clk instanceof Clock.UserTimeClock);
Reservoir rsv = ((MetricSuppliers.DefaultTimerSupplier)mgr.getTimerSupplier()).getReservoir();
assertTrue(rsv instanceof ExponentiallyDecayingReservoir);
}
public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
this.percentiles = percentiles;
this.gauges = this.percentiles.stream()
.filter(x -> x > 0 && x <= 100)
.collect(Collectors.toMap(Function.identity(),
x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D))));
}
SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
this.registry = registry;
this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
this.percentiles = percentiles;
this.gauges = percentiles.stream()
.filter(x -> x > 0 && x <= 100)
.collect(
Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
}
public static Histogram createHistogram5Min(MetricRegistry metrics, String name, final String pipelineName, final String pipelineRev) {
return create(
metrics,
new Histogram(new ExponentiallyDecayingReservoir()),
metricName(name, HISTOGRAM_M5_SUFFIX),
pipelineName,
pipelineRev
);
}
@Override
public void run(
SourcePipe originPipe,
List<PipeRunner> pipes,
BadRecordsHandler badRecordsHandler,
List<StageOutput> stageOutputsToOverride,
StatsAggregationHandler statsAggregationHandler
) throws StageException, PipelineRuntimeException {
this.originPipe = originPipe;
this.pipes = pipes;
this.badRecordsHandler = badRecordsHandler;
this.statsAggregationHandler = statsAggregationHandler;
this.runnerPool = new RunnerPool<>(pipes, new RuntimeStats(), new Histogram(new ExponentiallyDecayingReservoir()));
// Counter of batches that were already processed
batchesProcessed = new AtomicInteger(0);
stagesToSkip = new HashMap<>();
for (StageOutput stageOutput : stageOutputsToOverride) {
stagesToSkip.put(stageOutput.getInstanceName(), stageOutput);
}
if (originPipe.getStage().getStage() instanceof PushSource) {
runPushSource();
} else {
runPollSource();
}
}
@Before
public void createRunnerPool() {
this.runnerPool = new RunnerPool<>(
ImmutableList.of("a", "b"),
new RuntimeStats(),
new Histogram(new ExponentiallyDecayingReservoir())
);
}
protected Histogram histogram(String... names) {
try {
return registry.histogram(nameOf(names));
} catch (Exception e) {
return new Histogram(new ExponentiallyDecayingReservoir());
}
}
InnerHistogram(MetricContext context, String name, ContextAwareHistogram contextAwareHistogram) {
super(new ExponentiallyDecayingReservoir());
this.name = name;
Optional<MetricContext> parentContext = context.getParent();
if (parentContext.isPresent()) {
this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name));
} else {
this.parentHistogram = Optional.absent();
}
this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram);
}
@Test
public void testBuilder() {
final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class);
DataSource dataSource = Mockito.mock(DataSource.class);
PoolAdapterFactory<DataSource> poolAdapterFactory = Mockito.mock(PoolAdapterFactory.class);
ConnectionProxyFactory connectionProxyFactory = Mockito.mock(ConnectionProxyFactory.class);
Metrics metrics = Mockito.mock(Metrics.class);
PoolAdapter poolAdapter = Mockito.mock(PoolAdapter.class);
when(poolAdapterFactory.newInstance(any(ConfigurationProperties.class))).thenReturn(poolAdapter);
Configuration<DataSource> configuration = new Configuration.Builder<DataSource>(
"unique", dataSource, poolAdapterFactory)
.setConnectionProxyFactory(connectionProxyFactory)
.setJmxAutoStart(true)
.setJmxEnabled(true)
.setMetricLogReporterMillis(120)
.setMetricsFactory(new MetricsFactory() {
@Override
public Metrics newInstance(ConfigurationProperties configurationProperties) {
return new DropwizardMetrics(configurationProperties, metricRegistry, new ReservoirFactory() {
@Override
public Reservoir newInstance(Class<? extends Metric> metricClass, String metricName) {
return new ExponentiallyDecayingReservoir();
}
});
}
})
.build();
assertSame("unique", configuration.getUniqueName());
assertSame(connectionProxyFactory, configuration.getConnectionProxyFactory());
assertTrue(configuration.isJmxAutoStart());
assertTrue(configuration.isJmxEnabled());
assertEquals(120, configuration.getMetricLogReporterMillis());
assertSame(poolAdapter, configuration.getPoolAdapter());
assertSame(dataSource, configuration.getTargetDataSource());
}
@Test
public void testBuilder() {
final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class);
DataSource dataSource = Mockito.mock(DataSource.class);
PoolAdapterFactory<DataSource> poolAdapterFactory = Mockito.mock(PoolAdapterFactory.class);
ConnectionProxyFactory connectionProxyFactory = Mockito.mock(ConnectionProxyFactory.class);
Metrics metrics = Mockito.mock(Metrics.class);
PoolAdapter poolAdapter = Mockito.mock(PoolAdapter.class);
when(poolAdapterFactory.newInstance(any(ConfigurationProperties.class))).thenReturn(poolAdapter);
Configuration<DataSource> configuration = new Configuration.Builder<DataSource>(
"unique", dataSource, poolAdapterFactory)
.setConnectionProxyFactory(connectionProxyFactory)
.setJmxAutoStart(true)
.setJmxEnabled(true)
.setMetricLogReporterMillis(120)
.setMetricsFactory(new MetricsFactory() {
@Override
public Metrics newInstance(ConfigurationProperties configurationProperties) {
return new CodahaleMetrics(configurationProperties, metricRegistry, new ReservoirFactory() {
@Override
public Reservoir newInstance(Class<? extends Metric> metricClass, String metricName) {
return new ExponentiallyDecayingReservoir();
}
});
}
})
.build();
assertSame("unique", configuration.getUniqueName());
assertSame(connectionProxyFactory, configuration.getConnectionProxyFactory());
assertTrue(configuration.isJmxAutoStart());
assertTrue(configuration.isJmxEnabled());
assertEquals(120, configuration.getMetricLogReporterMillis());
assertSame(poolAdapter, configuration.getPoolAdapter());
assertSame(dataSource, configuration.getTargetDataSource());
}
public static void main(String[] args) throws Exception {
Random random = new Random(System.nanoTime());
Config config = new Config.Builder().directory(TestFileHelper.TEMP_PATH).compactionStrategy
(CompactionStrategies.SIZE_TIERED_COMPACTION_STRATEGY).tableCacheSize(512000000).indexCacheSize
(64000000).maxWriteRate(Integer.MAX_VALUE).build();
MetricRegistry metrics = new MetricRegistry();
ConsoleReporter reporter = PerformanceHelper.consoleReporter(metrics);
Timer readTimer = metrics.register("reads", new Timer(new ExponentiallyDecayingReservoir()));
DB db = HeftyDB.open(config);
db.compact().get();
//Read
for (int i = 0; i < RECORD_COUNT * 10; i++) {
String key = random.nextInt(RECORD_COUNT) + "";
Timer.Context watch = readTimer.time();
db.get(ByteBuffers.fromString(key));
watch.stop();
}
reporter.report();
db.logMetrics();
db.close();
System.exit(0);
}
/**
* Test caching behavior.
*/
@Test
public void testCache() {
AtomicInteger snapshotCalls = new AtomicInteger(0);
MockClock clock = new MockClock();
Reservoir reservoir = new ExponentiallyDecayingReservoir();
CachedHistogram histogram = new CachedHistogram(clock, reservoir, TimeUnit.SECONDS.toMillis(1), 0.50) {
@Override
public Snapshot getSnapshot() {
// count number of calls to test caching
snapshotCalls.getAndIncrement();
return super.getSnapshot();
}
};
long value = 2;
double epsilon = 0.01;
histogram.update(value);
// getSnapshot should be called the first time
assertEquals(value, histogram.getCachedValue(), epsilon);
assertEquals(1, snapshotCalls.get());
// the cached value should be used and getSnapshot should not be called.
assertEquals(value, histogram.getCachedValue(), epsilon);
assertEquals(1, snapshotCalls.get());
// after progressing time, the cached value should expire and getSnapshot should be called
clock.tick(1);
assertEquals(value, histogram.getCachedValue(), epsilon);
assertEquals(2, snapshotCalls.get());
}
public Metrics() {
this.meter = new Meter();
this.counter = new LongAdder();
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}
public ExponentialHistogram(int size, double alpha) {
this.dropwizardHistogram = new com.codahale.metrics.Histogram(
new ExponentiallyDecayingReservoir(size, alpha));
}
public ExponentialHistogram() {
this.dropwizardHistogram = new com.codahale.metrics.Histogram(
new ExponentiallyDecayingReservoir());
}
public static Reservoir exponentiallyDecayingReservoir() {
return new ExponentiallyDecayingReservoir();
}
public static Reservoir exponentiallyDecayingReservoir(int size, double alpha) {
return new ExponentiallyDecayingReservoir(size, alpha);
}
protected void init() {
this.meter = new Meter();
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}
/**
* Creates a new {@link SemanticMetricRegistry}.
*/
public SemanticMetricRegistry(final ConcurrentMap<MetricId, Metric> metrics) {
this(metrics, () -> new ExponentiallyDecayingReservoir());
}
/**
* Creates a new {@link SemanticMetricRegistry}.
*/
public SemanticMetricRegistry() {
// This is only for backward compatibility purpose. After removing the "buildMap" method
// we should call this(new ConcurrentHashMap<MetricId, Metric>()) instead.
this(new ConcurrentHashMap<MetricId, Metric>(), () -> new ExponentiallyDecayingReservoir());
}
public ReservoirWithTtl() {
this(new ExponentiallyDecayingReservoir(), DEFAULT_TTL_SECONDS, DEFAULT_MINIMUM_RATE);
}
public ReservoirWithTtl(final int ttlSeconds) {
this(new ExponentiallyDecayingReservoir(), ttlSeconds, DEFAULT_MINIMUM_RATE);
}
@SuppressWarnings({"unchecked"})
private static final Reservoir getReservoir(SolrResourceLoader loader, PluginInfo info) {
if (info == null) {
return new ExponentiallyDecayingReservoir();
}
Clock clk = getClock(info, CLOCK);
String clazz = ExponentiallyDecayingReservoir.class.getName();
int size = -1;
double alpha = -1;
long window = -1;
if (info.initArgs != null) {
if (info.initArgs.get(RESERVOIR) != null) {
String val = String.valueOf(info.initArgs.get(RESERVOIR)).trim();
if (!val.isEmpty()) {
clazz = val;
}
}
Number n = (Number)info.initArgs.get(RESERVOIR_SIZE);
if (n != null) {
size = n.intValue();
}
n = (Number)info.initArgs.get(RESERVOIR_EDR_ALPHA);
if (n != null) {
alpha = n.doubleValue();
}
n = (Number)info.initArgs.get(RESERVOIR_WINDOW);
if (n != null) {
window = n.longValue();
}
}
if (size <= 0) {
size = DEFAULT_SIZE;
}
if (alpha <= 0) {
alpha = DEFAULT_ALPHA;
}
// special case for core implementations
if (clazz.equals(EDR_CLAZZ)) {
return new ExponentiallyDecayingReservoir(size, alpha, clk);
} else if (clazz.equals(UNI_CLAZZ)) {
return new UniformReservoir(size);
} else if (clazz.equals(STW_CLAZZ)) {
if (window <= 0) {
window = DEFAULT_WINDOW; // 5 minutes, comparable to EDR
}
return new SlidingTimeWindowReservoir(window, TimeUnit.SECONDS);
} else if (clazz.equals(SW_CLAZZ)) {
return new SlidingWindowReservoir(size);
} else { // custom reservoir
Reservoir reservoir;
try {
reservoir = loader.newInstance(clazz, Reservoir.class);
if (reservoir instanceof PluginInfoInitialized) {
((PluginInfoInitialized)reservoir).init(info);
} else {
SolrPluginUtils.invokeSetters(reservoir, info.initArgs, true);
}
return reservoir;
} catch (Exception e) {
log.warn("Error initializing custom Reservoir implementation (will use default): {}", info, e);
return new ExponentiallyDecayingReservoir(size, alpha, clk);
}
}
}
@Override
public Histogram newMetric() {
return new Histogram(new ExponentiallyDecayingReservoir());
}
public ResourceControlledScheduledExecutor(final float maxCpuConsumption, final long minimumDelay) {
Utils.checkArgument(maxCpuConsumption > 0, "Max CPU Consumption cannot be less than zero");
scheduledExecutorService = new SafeScheduledExecutorService(2, "ResourceControlledScheduledExecutor");
scheduledExecutorService.scheduleAndForget(new Runnable() {
private final ExponentiallyDecayingReservoir decayingReservoir =
new ExponentiallyDecayingReservoir();
@Override
public void run() {
long start = System.currentTimeMillis();
boolean anyThrewError = false;
for (Runnable task : tasks) {
try {
task.run();
} catch (Throwable throwable) {
anyThrewError = true;
// unfortunately ScheduledExecutorService will eat throwables
// and then stop scheduling runnables which threw them
LOG.error("Task " + task + " had error: " + throwable, throwable);
}
}
long delay = minimumDelay;
if (!tasks.isEmpty()) {
decayingReservoir.update(System.currentTimeMillis() - start);
delay = calculateDelay(decayingReservoir.getSnapshot().getMedian(), maxCpuConsumption);
}
if (anyThrewError) {
// if a task fails with an exception it may have failed very quickly in which
// cause we will spin quite quickly spewing exceptions to the logs. If anything
// errors then we should proceed with caution
delay = Math.max(delay, TimeUnit.MINUTES.toMillis(1));
} else if (delay < minimumDelay) {
delay = minimumDelay;
}
try {
scheduledExecutorService.scheduleAndForget(this, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
if (!scheduledExecutorService.isShutdown()) {
throw e;
}
}
}
}, 10, TimeUnit.MILLISECONDS);
}
@Test
public void testReportMetrics() {
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1000;
}
};
Counter recordsProcessedCounter = new Counter();
recordsProcessedCounter.inc(10l);
Histogram recordSizeDistributionHistogram = new Histogram(new ExponentiallyDecayingReservoir());
recordSizeDistributionHistogram.update(1);
recordSizeDistributionHistogram.update(2);
recordSizeDistributionHistogram.update(3);
Meter recordProcessRateMeter = new Meter();
recordProcessRateMeter.mark(1l);
recordProcessRateMeter.mark(2l);
recordProcessRateMeter.mark(3l);
Timer totalDurationTimer = new Timer();
totalDurationTimer.update(1, TimeUnit.SECONDS);
totalDurationTimer.update(2, TimeUnit.SECONDS);
totalDurationTimer.update(3, TimeUnit.SECONDS);
SortedMap<String, Counter> counters = ImmutableSortedMap.<String, Counter>naturalOrder()
.put(RECORDS_PROCESSED, recordsProcessedCounter).build();
SortedMap<String, Gauge> gauges = ImmutableSortedMap.<String, Gauge>naturalOrder()
.put(QUEUE_SIZE, queueSizeGauge).build();
SortedMap<String, Histogram> histograms = ImmutableSortedMap.<String, Histogram>naturalOrder()
.put(RECORD_SIZE_DISTRIBUTION, recordSizeDistributionHistogram).build();
SortedMap<String, Meter> meters = ImmutableSortedMap.<String, Meter>naturalOrder()
.put(RECORD_PROCESS_RATE, recordProcessRateMeter).build();
SortedMap<String, Timer> timers = ImmutableSortedMap.<String, Timer>naturalOrder()
.put(TOTAL_DURATION, totalDurationTimer).build();
this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers);
Mockito.verify(this.recordsProcessedCount).increment(10l);
Mockito.verify(this.recordProcessRateCount).increment(6l);
Mockito.verify(this.recordSizeDistributionCount).increment(3l);
Mockito.verify(this.totalDurationCount).increment(3l);
Mockito.verify(this.queueSize).setValue(1000);
recordsProcessedCounter.inc(5l);
recordSizeDistributionHistogram.update(4);
recordProcessRateMeter.mark(4l);
totalDurationTimer.update(4, TimeUnit.SECONDS);
this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers);
Mockito.verify(this.recordsProcessedCount).increment(5l);
Mockito.verify(this.recordProcessRateCount).increment(4l);
Mockito.verify(this.recordSizeDistributionCount).increment(1l);
Mockito.verify(this.totalDurationCount).increment(1l);
}
@Test
public void testReportMetrics() {
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return 1000;
}
};
com.codahale.metrics.Counter recordsProcessedCounter = new com.codahale.metrics.Counter();
recordsProcessedCounter.inc(10l);
Histogram recordSizeDistributionHistogram = new Histogram(new ExponentiallyDecayingReservoir());
recordSizeDistributionHistogram.update(1);
recordSizeDistributionHistogram.update(2);
recordSizeDistributionHistogram.update(3);
Meter recordProcessRateMeter = new Meter();
recordProcessRateMeter.mark(1l);
recordProcessRateMeter.mark(2l);
recordProcessRateMeter.mark(3l);
Timer totalDurationTimer = new Timer();
totalDurationTimer.update(1, TimeUnit.SECONDS);
totalDurationTimer.update(2, TimeUnit.SECONDS);
totalDurationTimer.update(3, TimeUnit.SECONDS);
SortedMap<String, com.codahale.metrics.Counter> counters =
ImmutableSortedMap.<String, com.codahale.metrics.Counter>naturalOrder()
.put(RECORDS_PROCESSED, recordsProcessedCounter).build();
SortedMap<String, Gauge> gauges = ImmutableSortedMap.<String, Gauge>naturalOrder()
.put(QUEUE_SIZE, queueSizeGauge).build();
SortedMap<String, Histogram> histograms = ImmutableSortedMap.<String, Histogram>naturalOrder()
.put(RECORD_SIZE_DISTRIBUTION, recordSizeDistributionHistogram).build();
SortedMap<String, Meter> meters = ImmutableSortedMap.<String, Meter>naturalOrder()
.put(RECORD_PROCESS_RATE, recordProcessRateMeter).build();
SortedMap<String, Timer> timers = ImmutableSortedMap.<String, Timer>naturalOrder()
.put(TOTAL_DURATION, totalDurationTimer).build();
this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers);
Mockito.verify(this.recordsProcessedCount).increment(10l);
Mockito.verify(this.recordProcessRateCount).increment(6l);
Mockito.verify(this.recordSizeDistributionCount).increment(3l);
Mockito.verify(this.totalDurationCount).increment(3l);
Mockito.verify(this.queueSize).setValue(1000);
recordsProcessedCounter.inc(5l);
recordSizeDistributionHistogram.update(4);
recordProcessRateMeter.mark(4l);
totalDurationTimer.update(4, TimeUnit.SECONDS);
this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers);
Mockito.verify(this.recordsProcessedCount).increment(5l);
Mockito.verify(this.recordProcessRateCount).increment(4l);
Mockito.verify(this.recordSizeDistributionCount).increment(1l);
Mockito.verify(this.totalDurationCount).increment(1l);
}
ContextAwareHistogram(MetricContext context, String name) {
super(new ExponentiallyDecayingReservoir());
this.innerHistogram = new InnerHistogram(context, name, this);
this.context = context;
}