下面列出了怎么用org.apache.kafka.common.metrics.stats.Avg的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Metrics for Calculating the offset commit latency of a consumer.
* @param metrics the commit offset metrics
* @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
*/
public CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
int latencyPercentileGranularityMs) {
_inProgressCommit = false;
_commitOffsetLatency = metrics.sensor("commit-offset-latency");
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-avg", METRIC_GROUP_NAME, "The average latency in ms of committing offset", tags), new Avg());
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-max", METRIC_GROUP_NAME, "The maximum latency in ms of committing offset", tags), new Max());
if (latencyPercentileGranularityMs == 0) {
throw new IllegalArgumentException("The latency percentile granularity was incorrectly passed a zero value.");
}
// 2 extra buckets exist which are respectively designated for values which are less than 0.0 or larger than max.
int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
int sizeInBytes = bucketNum * 4;
_commitOffsetLatency.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
new Percentile(new MetricName("commit-offset-latency-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of committing offset", tags), 99.0),
new Percentile(new MetricName("commit-offset-latency-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of committing offset", tags), 99.9),
new Percentile(new MetricName("commit-offset-latency-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of committing offset", tags), 99.99)));
LOG.info("{} was constructed successfully.", this.getClass().getSimpleName());
}
/**
*
* @param metrics a named, numerical measurement. sensor is a handle to record numerical measurements as they occur.
* @param tags metrics/sensor's tags
*/
public ClusterTopicManipulationMetrics(final Metrics metrics, final Map<String, String> tags) {
super(metrics, tags);
_topicCreationSensor = metrics.sensor("topic-creation-metadata-propagation");
_topicDeletionSensor = metrics.sensor("topic-deletion-metadata-propagation");
_topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
"The average propagation duration in ms of propagating topic creation data and metadata to all brokers in the cluster",
tags), new Avg());
_topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-max", METRIC_GROUP_NAME,
"The maximum propagation time in ms of propagating topic creation data and metadata to all brokers in the cluster",
tags), new Max());
_topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
"The average propagation duration in milliseconds of propagating the topic deletion data and metadata "
+ "across all the brokers in the cluster.", tags), new Avg());
_topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-max", METRIC_GROUP_NAME,
"The maximum propagation time in milliseconds of propagating the topic deletion data and metadata "
+ "across all the brokers in the cluster.", tags), new Max());
LOGGER.debug("{} constructor was initialized successfully.", "ClusterTopicManipulationMetrics");
}
@Test
public void testMetricChange() throws Exception {
Metrics metrics = new Metrics();
DropwizardReporter reporter = new DropwizardReporter();
reporter.configure(new HashMap<String, Object>());
metrics.addReporter(reporter);
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
Map<String, Gauge> gauges = SharedMetricRegistries.getOrCreate("default").getGauges();
String expectedName = "org.apache.kafka.common.metrics.grp1.pack.bean1.avg";
Assert.assertEquals(1, gauges.size());
Assert.assertEquals(expectedName, gauges.keySet().toArray()[0]);
sensor.record(2.1);
sensor.record(2.2);
sensor.record(2.6);
Assert.assertEquals(2.3, (Double)gauges.get(expectedName).getValue(), 0.001);
}
public void addConsumerThreadMetrics() {
Stream.of(
metrics.sensor(INPUT_RECORDS_SIZE_SENSOR),
metrics.sensor(KAFKA_POLL_RECORDS_COUNT_SENSOR),
metrics.sensor(KAFKA_POLL_RECORDS_SIZE_SENSOR)
).forEach(
sensor -> {
checkState(sensor.add(metrics.metricName("min", sensor.name()), new Min()));
checkState(sensor.add(metrics.metricName("max", sensor.name()), new Max()));
checkState(sensor.add(metrics.metricName("avg", sensor.name()), new Avg()));
checkState(sensor.add(metrics.metricName("count-per-sec", sensor.name()), new Rate(new Count())));
}
);
}
private Sensor configureMessageConsumptionByQuerySensor(Metrics metrics) {
Sensor sensor = createSensor(metrics, "message-consumption-by-query");
sensor.add(metrics.metricName("messages-consumed-max", this.metricGroupName), new Max());
sensor.add(metrics.metricName("messages-consumed-min", this.metricGroupName), new Min());
sensor.add(metrics.metricName("messages-consumed-avg", this.metricGroupName), new Avg());
return sensor;
}
public ConsumeMetrics(final Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
int latencyPercentileGranularityMs) {
_bytesConsumed = metrics.sensor("bytes-consumed");
_bytesConsumed.add(new MetricName("bytes-consumed-rate", METRIC_GROUP_NAME, "The average number of bytes per second that are consumed", tags), new Rate());
_consumeError = metrics.sensor("consume-error");
_consumeError.add(new MetricName("consume-error-rate", METRIC_GROUP_NAME, "The average number of errors per second", tags), new Rate());
_consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new Total());
_recordsConsumed = metrics.sensor("records-consumed");
_recordsConsumed.add(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, "The average number of records per second that are consumed", tags), new Rate());
_recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new Total());
_recordsDuplicated = metrics.sensor("records-duplicated");
_recordsDuplicated.add(new MetricName("records-duplicated-rate", METRIC_GROUP_NAME, "The average number of records per second that are duplicated", tags), new Rate());
_recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new Total());
_recordsLost = metrics.sensor("records-lost");
_recordsLost.add(new MetricName("records-lost-rate", METRIC_GROUP_NAME, "The average number of records per second that are lost", tags), new Rate());
_recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new Total());
_recordsDelayed = metrics.sensor("records-delayed");
_recordsDelayed.add(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, "The average number of records per second that are either lost or arrive after maximum allowed latency under SLA", tags), new Rate());
_recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new Total());
_recordsDelay = metrics.sensor("records-delay");
_recordsDelay.add(new MetricName("records-delay-ms-avg", METRIC_GROUP_NAME, "The average latency of records from producer to consumer", tags), new Avg());
_recordsDelay.add(new MetricName("records-delay-ms-max", METRIC_GROUP_NAME, "The maximum latency of records from producer to consumer", tags), new Max());
// There are 2 extra buckets use for values smaller than 0.0 or larger than max, respectively.
int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
int sizeInBytes = 4 * bucketNum;
_recordsDelay.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
new Percentile(new MetricName("records-delay-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of records from producer to consumer", tags), 99.0),
new Percentile(new MetricName("records-delay-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of records from producer to consumer", tags), 99.9),
new Percentile(new MetricName("records-delay-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of records from producer to consumer", tags), 99.99)));
metrics.addMetric(new MetricName("consume-availability-avg", METRIC_GROUP_NAME, "The average consume availability", tags),
(config, now) -> {
double recordsConsumedRate = (double) metrics.metrics().get(metrics.metricName("records-consumed-rate", METRIC_GROUP_NAME, tags)).metricValue();
double recordsLostRate = (double) metrics.metrics().get(metrics.metricName("records-lost-rate", METRIC_GROUP_NAME, tags)).metricValue();
double recordsDelayedRate = (double) metrics.metrics().get(metrics.metricName("records-delayed-rate", METRIC_GROUP_NAME, tags)).metricValue();
if (new Double(recordsLostRate).isNaN())
recordsLostRate = 0;
if (new Double(recordsDelayedRate).isNaN())
recordsDelayedRate = 0;
return recordsConsumedRate + recordsLostRate > 0
? (recordsConsumedRate - recordsDelayedRate) / (recordsConsumedRate + recordsLostRate) : 0;
});
}
public ProduceMetrics(final Metrics metrics, final Map<String, String> tags, int latencyPercentileGranularityMs,
int latencyPercentileMaxMs, AtomicInteger partitionNumber, boolean treatZeroThroughputAsUnavailable) {
_metrics = metrics;
_tags = tags;
_recordsProducedPerPartition = new ConcurrentHashMap<>();
_produceErrorPerPartition = new ConcurrentHashMap<>();
_produceErrorInLastSendPerPartition = new ConcurrentHashMap<>();
_recordsProduced = metrics.sensor("records-produced");
_recordsProduced.add(
new MetricName("records-produced-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The average number of records per second that are produced", tags), new Rate());
_recordsProduced.add(
new MetricName("records-produced-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The total number of records that are produced", tags), new Total());
_produceError = metrics.sensor("produce-error");
_produceError.add(new MetricName("produce-error-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The average number of errors per second", tags), new Rate());
_produceError.add(new MetricName("produce-error-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The total number of errors", tags), new Total());
_produceDelay = metrics.sensor("produce-delay");
_produceDelay.add(new MetricName("produce-delay-ms-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The average delay in ms for produce request", tags), new Avg());
_produceDelay.add(new MetricName("produce-delay-ms-max", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The maximum delay in ms for produce request", tags), new Max());
// There are 2 extra buckets use for values smaller than 0.0 or larger than max, respectively.
int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
int sizeInBytes = 4 * bucketNum;
_produceDelay.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
new Percentile(new MetricName("produce-delay-ms-99th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The 99th percentile delay in ms for produce request", tags), 99.0), new Percentile(
new MetricName("produce-delay-ms-999th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The 99.9th percentile delay in ms for produce request", tags), 99.9), new Percentile(
new MetricName("produce-delay-ms-9999th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The 99.99th percentile delay in ms for produce request", tags), 99.99)));
metrics.addMetric(
new MetricName("produce-availability-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The average produce availability", tags), (config, now) -> {
double availabilitySum = 0.0;
int partitionNum = partitionNumber.get();
for (int partition = 0; partition < partitionNum; partition++) {
double recordsProduced = (double) metrics.metrics()
.get(metrics.metricName("records-produced-rate-partition-" + partition,
XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, tags))
.metricValue();
double produceError = (double) metrics.metrics()
.get(metrics.metricName("produce-error-rate-partition-" + partition,
XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, tags))
.metricValue();
// If there is no error, error rate sensor may expire and the value may be NaN. Treat NaN as 0 for error rate.
if (Double.isNaN(produceError) || Double.isInfinite(produceError)) {
produceError = 0;
}
// If there is either succeeded or failed produce to a partition, consider its availability as 0.
if (recordsProduced + produceError > 0) {
availabilitySum += recordsProduced / (recordsProduced + produceError);
} else if (!treatZeroThroughputAsUnavailable) {
// If user configures treatZeroThroughputAsUnavailable to be false, a partition's availability
// is 1.0 as long as there is no exception thrown from producer.
// This allows kafka admin to exactly monitor the availability experienced by Kafka users which
// will block and retry for a certain amount of time based on its configuration (e.g. retries, retry.backoff.ms).
// Note that if it takes a long time for messages to be retries and sent, the latency in the ConsumeService
// will increase and it will reduce ConsumeAvailability if the latency exceeds consume.latency.sla.ms
// If timeout is set to more than 60 seconds (the current samples window duration),
// the error sample might be expired before the next error can be produced.
// In order to detect offline partition with high producer timeout config, the error status during last
// send is also checked before declaring 1.0 availability for the partition.
Boolean lastSendError = _produceErrorInLastSendPerPartition.get(partition);
if (lastSendError == null || !lastSendError) {
availabilitySum += 1.0;
}
}
}
// Assign equal weight to per-partition availability when calculating overall availability
return availabilitySum / partitionNum;
}
);
}