类org.apache.kafka.common.MetricName源码实例Demo

下面列出了怎么用org.apache.kafka.common.MetricName的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: singer   文件: KafkaProducerMetricsMonitor.java
@SuppressWarnings({ "deprecation" })
protected void publishKafkaProducerMetricsToOstrich() {
  Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> producers = KafkaProducerManager
      .getInstance().getProducers();
  for (Entry<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> kafkaProducerEntry : producers
      .entrySet()) {
    KafkaProducerConfig key = kafkaProducerEntry.getKey();
    String signature = convertSignatureToTag(key);
    Map<MetricName, ? extends Metric> metrics = kafkaProducerEntry.getValue().metrics();
    for (Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
      if (PRODUCER_METRICS_WHITELIST.contains(entry.getKey().name())) {
        OpenTsdbMetricConverter.gauge("kafkaproducer." + entry.getKey().name(),
            entry.getValue().value(), "cluster=" + signature);
      }
    }
  }
}
 
源代码2 项目: mirus   文件: TaskJmxReporter.java
private void ensureMetricsCreated(ConnectorTaskId taskId) {
  Map<String, String> tags = getTaskLevelTags(taskId);
  MetricName taskMetric =
      getMetric(
          FAILED_TASK_ATTEMPTS_METRIC_NAME + "-count",
          TASK_CONNECTOR_JMX_GROUP_NAME,
          "count of restart attempts to a failed task",
          taskLevelJmxTags,
          tags);

  if (!metrics.metrics().containsKey(taskMetric)) {
    Sensor sensor = getSensor(taskId.toString());
    sensor.add(taskMetric, new Total());
    logger.info("Added the task {} to the list of JMX metrics", taskId);
    logger.debug("Updated set of JMX metrics is {}", metrics.metrics());
  }
}
 
源代码3 项目: kafka-monitor   文件: ProduceService.java
/**
 * 为每个partition添加Sensor
 *
 * @param partition
 */
public void addPartitionSensor(int partition) {
    try {
        Sensor recordsProducedSensor = metrics.sensor("records-produced-partition-" + partition);
        recordsProducedSensor.add(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME,
                "The average number of records per second that are produced to this partition", tags), new Rate());
        _recordsProducedPerPartition.put(partition, recordsProducedSensor);

        Sensor errorsSensor = metrics.sensor("produce-error-partition-" + partition);
        errorsSensor.add(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME,
                "The average number of errors per second when producing to this partition", tags), new Rate());
        _produceErrorPerPartition.put(partition, errorsSensor);
    } catch (Exception e) {
        logger.error("addPartitionSensor exception {}", e);
    }
}
 
源代码4 项目: hermes   文件: LoggerMetricsReporter.java
@Override
public void configure(Map<String, ?> configs) {
	scheduler = Executors.newSingleThreadScheduledExecutor(HermesThreadFactory.create("KafkaMetricsLogger", true));
	ClientEnvironment env = PlexusComponentLocator.lookup(ClientEnvironment.class);
	int interval = 60;
	Properties globalConfig = env.getGlobalConfig();
	if (globalConfig.containsKey("metric.reporters.interval.second")) {
		interval = Integer.parseInt(globalConfig.getProperty("metric.reporters.interval.second"));
	}
	long millis = TimeUnit.SECONDS.toMillis(interval);
	scheduler.scheduleAtFixedRate(new Runnable() {

		@Override
		public void run() {
			for (Map.Entry<MetricName, KafkaMetric> e : metrics.entrySet()) {
				m_logger.info("{} : {}", getMetricKey(e.getKey()), e.getValue().value());
			}
		}
	}, millis, millis, TimeUnit.MILLISECONDS);
}
 
源代码5 项目: micrometer   文件: KafkaMetrics.java
/**
 * Define common tags and meters before binding metrics
 */
void prepareToBindMetrics(MeterRegistry registry) {
    Map<MetricName, ? extends Metric> metrics = metricsSupplier.get();
    // Collect static metrics and tags
    Metric startTime = null;

    for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
        MetricName name = entry.getKey();
        if (clientId.equals(DEFAULT_VALUE) && name.tags().get(CLIENT_ID_TAG_NAME) != null)
            clientId = name.tags().get(CLIENT_ID_TAG_NAME);
        if (METRIC_GROUP_APP_INFO.equals(name.group()))
            if (VERSION_METRIC_NAME.equals(name.name())) {
                kafkaVersion = (String) entry.getValue().metricValue();
            } else if (START_TIME_METRIC_NAME.equals(name.name())) {
                startTime = entry.getValue();
            }
    }

    if (startTime != null) {
        bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
    }
}
 
源代码6 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldKeepMetersWhenMetricsDoNotChange() {
    //Given
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);

    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
}
 
源代码7 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldNotAddAppInfoMetrics() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        metrics.put(metricName, metric);
        MetricName appInfoMetricName =
                new MetricName("a1", KafkaMetrics.METRIC_GROUP_APP_INFO, "c0",
                        new LinkedHashMap<>());
        KafkaMetric appInfoMetric =
                new KafkaMetric(this, appInfoMetricName, new Value(), new MetricConfig(), Time.SYSTEM);
        metrics.put(appInfoMetricName, appInfoMetric);
        return metrics;
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);

    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
}
 
源代码8 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldRemoveOlderMeterWithLessTags() {
    Map<String, String> tags = new LinkedHashMap<>();
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", tags);
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(1); //only version

    tags.put("key0", "value0");
    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2);
}
 
源代码9 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldRemoveMeterWithLessTags() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName firstName = new MetricName("a", "b", "c", Collections.emptyMap());
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<String, String> tags = new LinkedHashMap<>();
        tags.put("key0", "value0");
        MetricName secondName = new MetricName("a", "b", "c", tags);
        KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        metrics.put(firstName, firstMetric);
        metrics.put(secondName, secondMetric);
        return metrics;
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
 
源代码10 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldBindMetersWithSameTags() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<String, String> firstTags = new LinkedHashMap<>();
        firstTags.put("key0", "value0");
        MetricName firstName = new MetricName("a", "b", "c", firstTags);
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<String, String> secondTags = new LinkedHashMap<>();
        secondTags.put("key0", "value1");
        MetricName secondName = new MetricName("a", "b", "c", secondTags);
        KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);

        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        metrics.put(firstName, firstMetric);
        metrics.put(secondName, secondMetric);
        return metrics;
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(2);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
 
源代码11 项目: micrometer   文件: KafkaMetricsTest.java
@Issue("#1968")
@Test
void shouldBindMetersWithDifferentClientIds() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<String, String> firstTags = new LinkedHashMap<>();
        firstTags.put("key0", "value0");
        firstTags.put("client-id", "client0");
        MetricName firstName = new MetricName("a", "b", "c", firstTags);
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(firstName, firstMetric);
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.counter("kafka.b.a", "client-id", "client1", "key0", "value0");

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(2);
}
 
源代码12 项目: micrometer   文件: KafkaMetricsTest.java
@Issue("#1968")
@Test
void shouldRemoveOlderMeterWithLessTagsWhenCommonTagsConfigured() {
    //Given
    Map<String, String> tags = new LinkedHashMap<>();
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", tags);
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.config().commonTags("common", "value");

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("common", "value")); // only version

    tags.put("key0", "value0");
    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
}
 
/**
 * kafka doesnt have an API for getting the client id from a client (WTH?!)
 * relying on reflection is tricky because we may be dealing with various
 * wrappers/decorators, but it does leak through kafka's metrics tags ...
 * @param metrics kafka client metrics
 * @return best guess for the client id
 */
private static String fishForClientId(Map<MetricName, ? extends Metric> metrics) {
  Set<String> candidates = new HashSet<>();
  metrics.forEach((metricName, metric) -> {
    Map<String, String> tags = metricName.tags();
    if (tags == null) {
      return;
    }
    String clientId = tags.get("client-id");
    if (clientId != null) {
      candidates.add(clientId);
    }
  });
  if (candidates.isEmpty()) {
    return null;
  }
  if (candidates.size() > 1) {
    throw new IllegalArgumentException("ambiguous client id from client: " + candidates);
  }
  return candidates.iterator().next();
}
 
源代码14 项目: kafka-monitor   文件: ConsumeService.java
@Override
public synchronized void start() {
  if (_running.compareAndSet(false, true)) {
    _consumeThread.start();
    LOG.info("{}/ConsumeService started.", _name);

    Sensor topicPartitionCount = metrics.sensor("topic-partitions");
    DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singleton(_topic));
    Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
    KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(_topic);
    TopicDescription topicDescription = null;
    try {
      topicDescription = topicDescriptionKafkaFuture.get();
    } catch (InterruptedException | ExecutionException e) {
      LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {}", _topic, e);
    }
    @SuppressWarnings("ConstantConditions")
    double partitionCount = topicDescription.partitions().size();
    topicPartitionCount.add(
        new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), new Total(partitionCount));
  }
}
 
源代码15 项目: kafka-monitor   文件: CommitLatencyMetrics.java
/**
 * Metrics for Calculating the offset commit latency of a consumer.
 * @param metrics the commit offset metrics
 * @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
 */
public CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
    int latencyPercentileGranularityMs) {
  _inProgressCommit = false;
  _commitOffsetLatency = metrics.sensor("commit-offset-latency");
  _commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-avg", METRIC_GROUP_NAME, "The average latency in ms of committing offset", tags), new Avg());
  _commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-max", METRIC_GROUP_NAME, "The maximum latency in ms of committing offset", tags), new Max());

  if (latencyPercentileGranularityMs == 0) {
    throw new IllegalArgumentException("The latency percentile granularity was incorrectly passed a zero value.");
  }

  // 2 extra buckets exist which are respectively designated for values which are less than 0.0 or larger than max.
  int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
  int sizeInBytes = bucketNum * 4;
  _commitOffsetLatency.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
      new Percentile(new MetricName("commit-offset-latency-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of committing offset", tags), 99.0),
      new Percentile(new MetricName("commit-offset-latency-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of committing offset", tags), 99.9),
      new Percentile(new MetricName("commit-offset-latency-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of committing offset", tags), 99.99)));
  LOG.info("{} was constructed successfully.", this.getClass().getSimpleName());
}
 
源代码16 项目: kafka-monitor   文件: CommitAvailabilityMetrics.java
/**
 * Metrics for Calculating the offset commit availability of a consumer.
 * @param metrics the commit offset metrics
 * @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
 */
public CommitAvailabilityMetrics(final Metrics metrics, final Map<String, String> tags) {
  LOG.info("{} called.", this.getClass().getSimpleName());
  _offsetsCommitted = metrics.sensor("offsets-committed");
  _offsetsCommitted.add(new MetricName("offsets-committed-total", METRIC_GROUP_NAME,
      "The total number of offsets per second that are committed.", tags), new Total());

  _failedCommitOffsets = metrics.sensor("failed-commit-offsets");
  _failedCommitOffsets.add(new MetricName("failed-commit-offsets-avg", METRIC_GROUP_NAME,
      "The average number of offsets per second that have failed.", tags), new Rate());
  _failedCommitOffsets.add(new MetricName("failed-commit-offsets-total", METRIC_GROUP_NAME,
      "The total number of offsets per second that have failed.", tags), new Total());

  metrics.addMetric(new MetricName("offsets-committed-avg", METRIC_GROUP_NAME, "The average offset commits availability.", tags),
    (MetricConfig config, long now) -> {
      Object offsetCommitTotal = metrics.metrics().get(metrics.metricName("offsets-committed-total", METRIC_GROUP_NAME, tags)).metricValue();
      Object offsetCommitFailTotal = metrics.metrics().get(metrics.metricName("failed-commit-offsets-total", METRIC_GROUP_NAME, tags)).metricValue();
      if (offsetCommitTotal != null && offsetCommitFailTotal != null) {
        double offsetsCommittedCount = (double) offsetCommitTotal;
        double offsetsCommittedErrorCount = (double) offsetCommitFailTotal;
        return offsetsCommittedCount / (offsetsCommittedCount + offsetsCommittedErrorCount);
      } else {
        return 0;
      }
    });
}
 
/**
 *
 * @param metrics a named, numerical measurement. sensor is a handle to record numerical measurements as they occur.
 * @param tags metrics/sensor's tags
 */
public ClusterTopicManipulationMetrics(final Metrics metrics, final Map<String, String> tags) {
  super(metrics, tags);
  _topicCreationSensor = metrics.sensor("topic-creation-metadata-propagation");
  _topicDeletionSensor = metrics.sensor("topic-deletion-metadata-propagation");
  _topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
      "The average propagation duration in ms of propagating topic creation data and metadata to all brokers in the cluster",
      tags), new Avg());
  _topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-max", METRIC_GROUP_NAME,
      "The maximum propagation time in ms of propagating topic creation data and metadata to all brokers in the cluster",
      tags), new Max());
  _topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
      "The average propagation duration in milliseconds of propagating the topic deletion data and metadata "
          + "across all the brokers in the cluster.", tags), new Avg());
  _topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-max", METRIC_GROUP_NAME,
      "The maximum propagation time in milliseconds of propagating the topic deletion data and metadata "
          + "across all the brokers in the cluster.", tags), new Max());

  LOGGER.debug("{} constructor was initialized successfully.", "ClusterTopicManipulationMetrics");
}
 
private static String dropwizardMetricName(KafkaMetric kafkaMetric) {
    MetricName name = kafkaMetric.metricName();

    List<String> nameParts = new ArrayList<String>(2);
    nameParts.add(name.group());
    nameParts.addAll(name.tags().values());
    nameParts.add(name.name());

    StringBuilder builder = new StringBuilder();
    for (String namePart : nameParts) {
        builder.append(namePart);
        builder.append(".");
    }
    builder.setLength(builder.length() - 1);  // Remove the trailing dot.
    String processedName = builder.toString().replace(' ', '_').replace("\\.", "_");

    return MetricRegistry.name(METRIC_PREFIX, processedName);
}
 
@Test
public void testMetricChange() throws Exception {
    Metrics metrics = new Metrics();
    DropwizardReporter reporter = new DropwizardReporter();
    reporter.configure(new HashMap<String, Object>());
    metrics.addReporter(reporter);
    Sensor sensor = metrics.sensor("kafka.requests");
    sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());

    Map<String, Gauge> gauges = SharedMetricRegistries.getOrCreate("default").getGauges();
    String expectedName = "org.apache.kafka.common.metrics.grp1.pack.bean1.avg";
    Assert.assertEquals(1, gauges.size());
    Assert.assertEquals(expectedName, gauges.keySet().toArray()[0]);

    sensor.record(2.1);
    sensor.record(2.2);
    sensor.record(2.6);
    Assert.assertEquals(2.3, (Double)gauges.get(expectedName).getValue(), 0.001);
}
 
@Test
public final void sendDoubleGauge() throws Exception {
  final double value = 10.11;
  Metric metric = new Metric() {
    @Override
    public MetricName metricName() {
      return new MetricName("test-metric", "group");
    }

    @Override
    public double value() {
      return value;
    }
  };

  addMetricAndRunReporter("foo", metric, "bar");
  verify(statsD).gauge(Matchers.eq("foo"), Matchers.eq(value), Matchers.eq("bar"));
}
 
源代码21 项目: suro   文件: KafkaSink.java
@Override
public long checkPause() {
    if (blockOnBufferFull) {
        return 0; // do not pause here, will be blocked
    } else {
        //producer.metrics().get(new MetricName("buffer-total-bytes", "producer-metrics", "desc", "client-id", "kafkasink"))
        double totalBytes = producer.metrics().get(
            new MetricName(
                "buffer-total-bytes",
                "producer-metrics",
                "desc",
                "client-id",
                props.getProperty("client.id"))).value();
        double availableBytes = producer.metrics().get(
            new MetricName(
                "buffer-available-bytes",
                "producer-metrics",
                "desc",
                "client-id",
                props.getProperty("client.id"))).value();

        double consumedMemory = totalBytes - availableBytes;
        double memoryRate = consumedMemory / totalBytes;
        if (memoryRate >= 0.5) {
            double outgoingRate = producer.metrics().get(
                new MetricName(
                    "outgoing-byte-rate",
                    "producer-metrics",
                    "desc",
                    "client-id",
                    props.getProperty("client.id"))).value();
            double throughputRate = Math.max(outgoingRate, 1.0);
            return (long) (consumedMemory / throughputRate * 1000);
        } else {
            return 0;
        }
    }
}
 
源代码22 项目: kbear   文件: ProducerTest.java
protected void checkOtherApis(Producer<String, String> producer) {
    topics.forEach(t -> {
        List<PartitionInfo> partitions = producer.partitionsFor(t);
        Assert.assertNotNull(partitions);
        Assert.assertEquals(1, partitions.size());
    });

    Map<MetricName, ?> metrics = producer.metrics();
    System.out.println("metrics: " + metrics);
    Assert.assertFalse(CollectionExtension.isEmpty(metrics));
}
 
源代码23 项目: suro   文件: ServoReporter.java
private void addMetric(KafkaMetric metric) {
    MetricName metricName = metric.metricName();
    MonitorConfig.Builder builder = MonitorConfig.builder(metricName.name())
        .withTag("group", metricName.group());
    for(Map.Entry<String, String> tag : metricName.tags().entrySet()) {
        builder.withTag(tag.getKey(), tag.getValue());
    }
    MonitorConfig monitorConfig = builder.build();
    gauges.put(Servo.getDoubleGauge(monitorConfig), metric);
}
 
private Function<MetricName, List<Tag>> getTagFunction() {
    return metricName -> metricName
            .tags()
            .entrySet()
            .stream()
            .filter(entry -> getIncludedTags().contains(entry.getKey()))
            .map(entry -> Tag.of(entry.getKey(), entry.getValue()))
            .collect(Collectors.toList());
}
 
源代码25 项目: mirus   文件: MissingPartitionsJmxReporter.java
MissingPartitionsJmxReporter(Metrics metrics) {
  super(metrics);
  Sensor missingPartsSensor = metrics.sensor(MISSING_DEST_PARTITIONS);
  MetricName missingPartsName = metrics.metricName(MISSING_DEST_PARTITIONS + "-count", "mirus");
  missingPartsSensor.add(missingPartsName, new Value());
  this.missingPartsSensor = missingPartsSensor;
}
 
源代码26 项目: mirus   文件: TaskJmxReporterTest.java
private void assertFailedMetricCount(String state, int task, Double expected) {
  ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, task);
  TaskState taskState = new TaskState(task, state, "worker1", "Test");
  taskJmxReporter.updateMetrics(taskId, taskState);

  HashMap<String, String> tags = new HashMap<>();
  tags.put("connector", CONNECTOR_NAME);
  tags.put("task", Integer.toString(task));

  assertEquals(
      expected,
      metrics
          .metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
          .metricValue());
}
 
源代码27 项目: mirus   文件: ConnectorJmxReporterTest.java
@Test
public void testIncrementTotalFailedCount() {
  assertEquals(
      0.0d,
      metrics
          .metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
          .metricValue());
  connectorJmxReporter.incrementTotalFailedCount(CONNECTOR_NAME);
  assertEquals(
      1.0d,
      metrics
          .metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
          .metricValue());
}
 
源代码28 项目: mirus   文件: ConnectorJmxReporterTest.java
@Test
public void testIncrementConnectorRestartAttempts() {
  assertEquals(
      0.0d,
      metrics
          .metric(new MetricName("connector-failed-restart-attempts-count", GROUP, "", tags))
          .metricValue());
  connectorJmxReporter.incrementConnectorRestartAttempts(CONNECTOR_NAME);
  assertEquals(
      1.0d,
      metrics
          .metric(new MetricName("connector-failed-restart-attempts-count", GROUP, "", tags))
          .metricValue());
}
 
源代码29 项目: kafka-monitor   文件: ProduceService.java
public ProduceMetrics(Metrics metrics, final Map<String, String> tags) {
    this.metrics = metrics;
    this.tags = tags;


    _recordsProducedPerPartition = new ConcurrentHashMap<>();
    _produceErrorPerPartition = new ConcurrentHashMap<>();


    recordsProduce = metrics.sensor("records-produced");
    recordsProduce.add(new MetricName("records-produced-total", METRIC_GROUP_NAME, "The total number of records that are produced", tags), new Total());
    errorProduce = metrics.sensor("error-produce");
    errorProduce.add(new MetricName("error-produce-total", METRIC_GROUP_NAME, "", tags), new Total());

    metrics.addMetric(new MetricName("produce-availability-avg", METRIC_GROUP_NAME, "The average produce availability", tags),
            (config, now) -> {
                double availabilitySum = 0.0;
                //可用性等于每个partition的可用性之和除以partition总数
                //partition可用性等于成功发送率除以失败率
                int num = partitionNum.get();

                for (int partition = 0; partition < num; partition++) {
                    double recordsProduced = produceMetrics.metrics.metrics().get(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME, tags)).value();
                    double produceError = produceMetrics.metrics.metrics().get(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME, tags)).value();

                    if (Double.isNaN(produceError) || Double.isInfinite(produceError)) {
                        produceError = 0;
                    }
                    if (recordsProduced + produceError > 0) {
                        availabilitySum += recordsProduced / (recordsProduced + produceError);
                    }
                }
                return availabilitySum / num;
                //return 0;
            });


}
 
源代码30 项目: incubator-gobblin   文件: Kafka09ConsumerClient.java
@Override
public Map<String, Metric> getMetrics() {
  Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
  Map<String, Metric> codaHaleMetricMap = new HashMap<>();

  kafkaMetrics
      .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
  return codaHaleMetricMap;
}
 
 类所在包
 类方法
 同包方法