类org.apache.kafka.common.metrics.stats.Avg源码实例Demo

下面列出了怎么用org.apache.kafka.common.metrics.stats.Avg的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kafka-monitor   文件: CommitLatencyMetrics.java
/**
 * Metrics for Calculating the offset commit latency of a consumer.
 * @param metrics the commit offset metrics
 * @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
 */
public CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
    int latencyPercentileGranularityMs) {
  _inProgressCommit = false;
  _commitOffsetLatency = metrics.sensor("commit-offset-latency");
  _commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-avg", METRIC_GROUP_NAME, "The average latency in ms of committing offset", tags), new Avg());
  _commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-max", METRIC_GROUP_NAME, "The maximum latency in ms of committing offset", tags), new Max());

  if (latencyPercentileGranularityMs == 0) {
    throw new IllegalArgumentException("The latency percentile granularity was incorrectly passed a zero value.");
  }

  // 2 extra buckets exist which are respectively designated for values which are less than 0.0 or larger than max.
  int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
  int sizeInBytes = bucketNum * 4;
  _commitOffsetLatency.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
      new Percentile(new MetricName("commit-offset-latency-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of committing offset", tags), 99.0),
      new Percentile(new MetricName("commit-offset-latency-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of committing offset", tags), 99.9),
      new Percentile(new MetricName("commit-offset-latency-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of committing offset", tags), 99.99)));
  LOG.info("{} was constructed successfully.", this.getClass().getSimpleName());
}
 
/**
 *
 * @param metrics a named, numerical measurement. sensor is a handle to record numerical measurements as they occur.
 * @param tags metrics/sensor's tags
 */
public ClusterTopicManipulationMetrics(final Metrics metrics, final Map<String, String> tags) {
  super(metrics, tags);
  _topicCreationSensor = metrics.sensor("topic-creation-metadata-propagation");
  _topicDeletionSensor = metrics.sensor("topic-deletion-metadata-propagation");
  _topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
      "The average propagation duration in ms of propagating topic creation data and metadata to all brokers in the cluster",
      tags), new Avg());
  _topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-max", METRIC_GROUP_NAME,
      "The maximum propagation time in ms of propagating topic creation data and metadata to all brokers in the cluster",
      tags), new Max());
  _topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
      "The average propagation duration in milliseconds of propagating the topic deletion data and metadata "
          + "across all the brokers in the cluster.", tags), new Avg());
  _topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-max", METRIC_GROUP_NAME,
      "The maximum propagation time in milliseconds of propagating the topic deletion data and metadata "
          + "across all the brokers in the cluster.", tags), new Max());

  LOGGER.debug("{} constructor was initialized successfully.", "ClusterTopicManipulationMetrics");
}
 
@Test
public void testMetricChange() throws Exception {
    Metrics metrics = new Metrics();
    DropwizardReporter reporter = new DropwizardReporter();
    reporter.configure(new HashMap<String, Object>());
    metrics.addReporter(reporter);
    Sensor sensor = metrics.sensor("kafka.requests");
    sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());

    Map<String, Gauge> gauges = SharedMetricRegistries.getOrCreate("default").getGauges();
    String expectedName = "org.apache.kafka.common.metrics.grp1.pack.bean1.avg";
    Assert.assertEquals(1, gauges.size());
    Assert.assertEquals(expectedName, gauges.keySet().toArray()[0]);

    sensor.record(2.1);
    sensor.record(2.2);
    sensor.record(2.6);
    Assert.assertEquals(2.3, (Double)gauges.get(expectedName).getValue(), 0.001);
}
 
源代码4 项目: kafka-workers   文件: WorkersMetrics.java
public void addConsumerThreadMetrics() {
    Stream.of(
            metrics.sensor(INPUT_RECORDS_SIZE_SENSOR),
            metrics.sensor(KAFKA_POLL_RECORDS_COUNT_SENSOR),
            metrics.sensor(KAFKA_POLL_RECORDS_SIZE_SENSOR)
    ).forEach(
            sensor -> {
                checkState(sensor.add(metrics.metricName("min", sensor.name()), new Min()));
                checkState(sensor.add(metrics.metricName("max", sensor.name()), new Max()));
                checkState(sensor.add(metrics.metricName("avg", sensor.name()), new Avg()));
                checkState(sensor.add(metrics.metricName("count-per-sec", sensor.name()), new Rate(new Count())));
            }
    );
}
 
private Sensor configureMessageConsumptionByQuerySensor(Metrics metrics) {
  Sensor sensor = createSensor(metrics, "message-consumption-by-query");
  sensor.add(metrics.metricName("messages-consumed-max", this.metricGroupName), new Max());
  sensor.add(metrics.metricName("messages-consumed-min", this.metricGroupName), new Min());
  sensor.add(metrics.metricName("messages-consumed-avg", this.metricGroupName), new Avg());
  return sensor;
}
 
源代码6 项目: kafka-monitor   文件: ConsumeMetrics.java
public ConsumeMetrics(final Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
    int latencyPercentileGranularityMs) {

  _bytesConsumed = metrics.sensor("bytes-consumed");
  _bytesConsumed.add(new MetricName("bytes-consumed-rate", METRIC_GROUP_NAME, "The average number of bytes per second that are consumed", tags), new Rate());

  _consumeError = metrics.sensor("consume-error");
  _consumeError.add(new MetricName("consume-error-rate", METRIC_GROUP_NAME, "The average number of errors per second", tags), new Rate());
  _consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new Total());

  _recordsConsumed = metrics.sensor("records-consumed");
  _recordsConsumed.add(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, "The average number of records per second that are consumed", tags), new Rate());
  _recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new Total());

  _recordsDuplicated = metrics.sensor("records-duplicated");
  _recordsDuplicated.add(new MetricName("records-duplicated-rate", METRIC_GROUP_NAME, "The average number of records per second that are duplicated", tags), new Rate());
  _recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new Total());

  _recordsLost = metrics.sensor("records-lost");
  _recordsLost.add(new MetricName("records-lost-rate", METRIC_GROUP_NAME, "The average number of records per second that are lost", tags), new Rate());
  _recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new Total());

  _recordsDelayed = metrics.sensor("records-delayed");
  _recordsDelayed.add(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, "The average number of records per second that are either lost or arrive after maximum allowed latency under SLA", tags), new Rate());
  _recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new Total());

  _recordsDelay = metrics.sensor("records-delay");
  _recordsDelay.add(new MetricName("records-delay-ms-avg", METRIC_GROUP_NAME, "The average latency of records from producer to consumer", tags), new Avg());
  _recordsDelay.add(new MetricName("records-delay-ms-max", METRIC_GROUP_NAME, "The maximum latency of records from producer to consumer", tags), new Max());

  // There are 2 extra buckets use for values smaller than 0.0 or larger than max, respectively.
  int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
  int sizeInBytes = 4 * bucketNum;
  _recordsDelay.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
      new Percentile(new MetricName("records-delay-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of records from producer to consumer", tags), 99.0),
      new Percentile(new MetricName("records-delay-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of records from producer to consumer", tags), 99.9),
      new Percentile(new MetricName("records-delay-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of records from producer to consumer", tags), 99.99)));

  metrics.addMetric(new MetricName("consume-availability-avg", METRIC_GROUP_NAME, "The average consume availability", tags),
    (config, now) -> {
      double recordsConsumedRate = (double) metrics.metrics().get(metrics.metricName("records-consumed-rate", METRIC_GROUP_NAME, tags)).metricValue();
      double recordsLostRate = (double) metrics.metrics().get(metrics.metricName("records-lost-rate", METRIC_GROUP_NAME, tags)).metricValue();
      double recordsDelayedRate = (double) metrics.metrics().get(metrics.metricName("records-delayed-rate", METRIC_GROUP_NAME, tags)).metricValue();

      if (new Double(recordsLostRate).isNaN())
        recordsLostRate = 0;
      if (new Double(recordsDelayedRate).isNaN())
        recordsDelayedRate = 0;

      return recordsConsumedRate + recordsLostRate > 0
          ? (recordsConsumedRate - recordsDelayedRate) / (recordsConsumedRate + recordsLostRate) : 0;
    });
}
 
源代码7 项目: kafka-monitor   文件: ProduceMetrics.java
public ProduceMetrics(final Metrics metrics, final Map<String, String> tags, int latencyPercentileGranularityMs,
    int latencyPercentileMaxMs, AtomicInteger partitionNumber, boolean treatZeroThroughputAsUnavailable) {
  _metrics = metrics;
  _tags = tags;

  _recordsProducedPerPartition = new ConcurrentHashMap<>();
  _produceErrorPerPartition = new ConcurrentHashMap<>();
  _produceErrorInLastSendPerPartition = new ConcurrentHashMap<>();

  _recordsProduced = metrics.sensor("records-produced");
  _recordsProduced.add(
      new MetricName("records-produced-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The average number of records per second that are produced", tags), new Rate());
  _recordsProduced.add(
      new MetricName("records-produced-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The total number of records that are produced", tags), new Total());

  _produceError = metrics.sensor("produce-error");
  _produceError.add(new MetricName("produce-error-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
      "The average number of errors per second", tags), new Rate());
  _produceError.add(new MetricName("produce-error-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
      "The total number of errors", tags), new Total());

  _produceDelay = metrics.sensor("produce-delay");
  _produceDelay.add(new MetricName("produce-delay-ms-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
      "The average delay in ms for produce request", tags), new Avg());
  _produceDelay.add(new MetricName("produce-delay-ms-max", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
      "The maximum delay in ms for produce request", tags), new Max());

  // There are 2 extra buckets use for values smaller than 0.0 or larger than max, respectively.
  int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
  int sizeInBytes = 4 * bucketNum;
  _produceDelay.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
      new Percentile(new MetricName("produce-delay-ms-99th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The 99th percentile delay in ms for produce request", tags), 99.0), new Percentile(
      new MetricName("produce-delay-ms-999th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The 99.9th percentile delay in ms for produce request", tags), 99.9), new Percentile(
      new MetricName("produce-delay-ms-9999th", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The 99.99th percentile delay in ms for produce request", tags), 99.99)));

  metrics.addMetric(
      new MetricName("produce-availability-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
          "The average produce availability", tags), (config, now) -> {
      double availabilitySum = 0.0;
      int partitionNum = partitionNumber.get();
      for (int partition = 0; partition < partitionNum; partition++) {
        double recordsProduced = (double) metrics.metrics()
            .get(metrics.metricName("records-produced-rate-partition-" + partition,
                XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, tags))
            .metricValue();
        double produceError = (double) metrics.metrics()
            .get(metrics.metricName("produce-error-rate-partition-" + partition,
                XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, tags))
            .metricValue();
        // If there is no error, error rate sensor may expire and the value may be NaN. Treat NaN as 0 for error rate.
        if (Double.isNaN(produceError) || Double.isInfinite(produceError)) {
          produceError = 0;
        }
        // If there is either succeeded or failed produce to a partition, consider its availability as 0.
        if (recordsProduced + produceError > 0) {
          availabilitySum += recordsProduced / (recordsProduced + produceError);
        } else if (!treatZeroThroughputAsUnavailable) {
          // If user configures treatZeroThroughputAsUnavailable to be false, a partition's availability
          // is 1.0 as long as there is no exception thrown from producer.
          // This allows kafka admin to exactly monitor the availability experienced by Kafka users which
          // will block and retry for a certain amount of time based on its configuration (e.g. retries, retry.backoff.ms).
          // Note that if it takes a long time for messages to be retries and sent, the latency in the ConsumeService
          // will increase and it will reduce ConsumeAvailability if the latency exceeds consume.latency.sla.ms
          // If timeout is set to more than 60 seconds (the current samples window duration),
          // the error sample might be expired before the next error can be produced.
          // In order to detect offline partition with high producer timeout config, the error status during last
          // send is also checked before declaring 1.0 availability for the partition.
          Boolean lastSendError = _produceErrorInLastSendPerPartition.get(partition);
          if (lastSendError == null || !lastSendError) {
            availabilitySum += 1.0;
          }
        }
      }

      // Assign equal weight to per-partition availability when calculating overall availability
      return availabilitySum / partitionNum;
    }
  );
}
 
 类所在包
 类方法
 同包方法