类org.apache.kafka.common.metrics.KafkaMetric源码实例Demo

下面列出了怎么用org.apache.kafka.common.metrics.KafkaMetric的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hermes   文件: LoggerMetricsReporter.java
@Override
public void configure(Map<String, ?> configs) {
	scheduler = Executors.newSingleThreadScheduledExecutor(HermesThreadFactory.create("KafkaMetricsLogger", true));
	ClientEnvironment env = PlexusComponentLocator.lookup(ClientEnvironment.class);
	int interval = 60;
	Properties globalConfig = env.getGlobalConfig();
	if (globalConfig.containsKey("metric.reporters.interval.second")) {
		interval = Integer.parseInt(globalConfig.getProperty("metric.reporters.interval.second"));
	}
	long millis = TimeUnit.SECONDS.toMillis(interval);
	scheduler.scheduleAtFixedRate(new Runnable() {

		@Override
		public void run() {
			for (Map.Entry<MetricName, KafkaMetric> e : metrics.entrySet()) {
				m_logger.info("{} : {}", getMetricKey(e.getKey()), e.getValue().value());
			}
		}
	}, millis, millis, TimeUnit.MILLISECONDS);
}
 
源代码2 项目: cruise-control   文件: MetricsUtils.java
/**
 * Create a Cruise Control Metric.
 *
 * @param kafkaMetric Kafka metric name.
 * @param now The current time in milliseconds.
 * @param brokerId Broker Id.
 * @return KafkaMetric converted as a CruiseControlMetric.
 */
public static CruiseControlMetric toCruiseControlMetric(KafkaMetric kafkaMetric, long now, int brokerId) {
  org.apache.kafka.common.MetricName metricName = kafkaMetric.metricName();
  if (!(kafkaMetric.metricValue() instanceof Double)) {
    throw new IllegalArgumentException(String.format("Cannot convert non-double (%s) KafkaMetric %s to a Cruise Control"
                                                     + " metric for broker %d", kafkaMetric.metricValue().getClass(),
                                                     kafkaMetric.metricName(), brokerId));
  }

  CruiseControlMetric ccm = toCruiseControlMetric(now, brokerId, metricName.name(), metricName.tags(), (double) kafkaMetric.metricValue());
  if (ccm == null) {
    throw new IllegalArgumentException(String.format("Cannot convert KafkaMetric %s to a Cruise Control metric for "
                                                         + "broker %d at time %d", kafkaMetric.metricName(), brokerId, now));
  }
  return ccm;
}
 
源代码3 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldKeepMetersWhenMetricsDoNotChange() {
    //Given
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);

    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
}
 
源代码4 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldNotAddAppInfoMetrics() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        metrics.put(metricName, metric);
        MetricName appInfoMetricName =
                new MetricName("a1", KafkaMetrics.METRIC_GROUP_APP_INFO, "c0",
                        new LinkedHashMap<>());
        KafkaMetric appInfoMetric =
                new KafkaMetric(this, appInfoMetricName, new Value(), new MetricConfig(), Time.SYSTEM);
        metrics.put(appInfoMetricName, appInfoMetric);
        return metrics;
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);

    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
}
 
源代码5 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldRemoveOlderMeterWithLessTags() {
    Map<String, String> tags = new LinkedHashMap<>();
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", tags);
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(1); //only version

    tags.put("key0", "value0");
    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2);
}
 
源代码6 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldRemoveMeterWithLessTags() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName firstName = new MetricName("a", "b", "c", Collections.emptyMap());
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<String, String> tags = new LinkedHashMap<>();
        tags.put("key0", "value0");
        MetricName secondName = new MetricName("a", "b", "c", tags);
        KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        metrics.put(firstName, firstMetric);
        metrics.put(secondName, secondMetric);
        return metrics;
    };
    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
 
源代码7 项目: micrometer   文件: KafkaMetricsTest.java
@Test
void shouldBindMetersWithSameTags() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<String, String> firstTags = new LinkedHashMap<>();
        firstTags.put("key0", "value0");
        MetricName firstName = new MetricName("a", "b", "c", firstTags);
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        Map<String, String> secondTags = new LinkedHashMap<>();
        secondTags.put("key0", "value1");
        MetricName secondName = new MetricName("a", "b", "c", secondTags);
        KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);

        Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
        metrics.put(firstName, firstMetric);
        metrics.put(secondName, secondMetric);
        return metrics;
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(2);
    assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
 
源代码8 项目: micrometer   文件: KafkaMetricsTest.java
@Issue("#1968")
@Test
void shouldBindMetersWithDifferentClientIds() {
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        Map<String, String> firstTags = new LinkedHashMap<>();
        firstTags.put("key0", "value0");
        firstTags.put("client-id", "client0");
        MetricName firstName = new MetricName("a", "b", "c", firstTags);
        KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(firstName, firstMetric);
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.counter("kafka.b.a", "client-id", "client1", "key0", "value0");

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(2);
}
 
源代码9 项目: micrometer   文件: KafkaMetricsTest.java
@Issue("#1968")
@Test
void shouldRemoveOlderMeterWithLessTagsWhenCommonTagsConfigured() {
    //Given
    Map<String, String> tags = new LinkedHashMap<>();
    Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
        MetricName metricName = new MetricName("a", "b", "c", tags);
        KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
        return Collections.singletonMap(metricName, metric);
    };

    kafkaMetrics = new KafkaMetrics(supplier);
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.config().commonTags("common", "value");

    kafkaMetrics.bindTo(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("common", "value")); // only version

    tags.put("key0", "value0");
    kafkaMetrics.checkAndBindMetrics(registry);
    assertThat(registry.getMeters()).hasSize(1);
    assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
}
 
@Override
public void metricChange(final KafkaMetric kafkaMetric) {
    LOGGER.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
    String name = dropwizardMetricName(kafkaMetric);

    Gauge<Double> gauge = new Gauge<Double>() {
        @Override
        public Double getValue() {
            return kafkaMetric.value();
        }
    };
    LOGGER.debug("Registering {}", name);
    try {
        registry.register(name, gauge);
        metricNames.add(name);
    } catch (IllegalArgumentException e) {
        LOGGER.debug("metricChange called for `{}' which was already registered, ignoring.", name);
    }
}
 
private static String dropwizardMetricName(KafkaMetric kafkaMetric) {
    MetricName name = kafkaMetric.metricName();

    List<String> nameParts = new ArrayList<String>(2);
    nameParts.add(name.group());
    nameParts.addAll(name.tags().values());
    nameParts.add(name.name());

    StringBuilder builder = new StringBuilder();
    for (String namePart : nameParts) {
        builder.append(namePart);
        builder.append(".");
    }
    builder.setLength(builder.length() - 1);  // Remove the trailing dot.
    String processedName = builder.toString().replace(' ', '_').replace("\\.", "_");

    return MetricRegistry.name(METRIC_PREFIX, processedName);
}
 
@Override
public void metricChange(final KafkaMetric metric) {
  String name = getMetricName(metric);

  StringBuilder strBuilder = new StringBuilder();

  for (String key : metric.metricName().tags().keySet()) {
    strBuilder.append(key).append(":").append(metric.metricName().tags().get(key)).append(",");
  }

  if (strBuilder.length() > 0) {
    strBuilder.deleteCharAt(strBuilder.length() - 1);
  }

  registry.register(name, metric, strBuilder.toString());
  log.debug("metrics name: {}", name);
}
 
@Test
public void testExceptionMetrics() {
  Response response = ClientBuilder.newClient(app.resourceConfig.getConfiguration())
      .target("http://localhost:" + config.getInt(RestConfig.PORT_CONFIG))
      .path("/private/exception")
      .request(MediaType.APPLICATION_JSON_TYPE)
      .get();
  assertEquals(404, response.getStatus());

  for (KafkaMetric metric: TestMetricsReporter.getMetricTimeseries()) {
    if (metric.metricName().name().equals("request-error-rate")) {
      if (metric.metricName().tags().getOrDefault(HTTP_STATUS_CODE_TAG, "").equals("4xx")) {
        assertTrue("Actual: " + metric.value(),
            metric.value() > 0);
      } else if (!metric.metricName().tags().isEmpty()) {
        assertTrue("Actual: " + metric.value() + metric.metricName(),
            metric.value() == 0.0 || Double.isNaN(metric.value()));
      }
    }
  }
}
 
@Override
public void init(List<KafkaMetric> metrics) {
    this.metrics = metrics;
    for (MeterRegistry meterRegistry : METER_REGISTRIES) {
        for (KafkaMetric metric : metrics) {
            registerMetric(meterRegistry, metric);
        }
    }
}
 
private void registerMetric(MeterRegistry meterRegistry, KafkaMetric metric) {
    KafkaMetricMeterTypeBuilder.newBuilder()
            .prefix(getMetricPrefix())
            .metric(metric)
            .tagFunction(getTagFunction())
            .registry(meterRegistry)
            .build();
}
 
@Override
public void init(List<KafkaMetric> metrics) {
  for (KafkaMetric kafkaMetric : metrics) {
    addMetricIfInterested(kafkaMetric);
  }
  LOG.info("Added {} Kafka metrics for Cruise Control metrics during initialization.", _interestedMetrics.size());
  _metricsReporterRunner = new KafkaThread("CruiseControlMetricsReporterRunner", this, true);
  _yammerMetricProcessor = new YammerMetricProcessor();
  _metricsReporterRunner.start();
}
 
private void reportKafkaMetrics(long now) {
  LOG.debug("Reporting KafkaMetrics. {}", _interestedMetrics.values());
  for (KafkaMetric metric : _interestedMetrics.values()) {
    sendCruiseControlMetric(MetricsUtils.toCruiseControlMetric(metric, now, _brokerId));
  }
  LOG.debug("Finished reporting KafkaMetrics.");
}
 
private void addMetricIfInterested(KafkaMetric metric) {
  LOG.trace("Checking Kafka metric {}", metric.metricName());
  if (MetricsUtils.isInterested(metric.metricName())) {
    LOG.debug("Added new metric {} to Cruise Control metrics reporter.", metric.metricName());
    _interestedMetrics.put(metric.metricName(), metric);
  }
}
 
源代码19 项目: emodb   文件: DropwizardMetricsReporter.java
@Override
public void init(List<KafkaMetric> metrics) {
    _registry = SharedMetricRegistries.getOrCreate(REGISTRY_NAME);
    for (KafkaMetric kafkaMetric : metrics) {
        metricChange(kafkaMetric);
    }
}
 
源代码20 项目: emodb   文件: DropwizardMetricsReporter.java
@Override
public void metricChange(KafkaMetric metric) {
    _log.debug("Processing a metric change for {}", metric.metricName());
    String name = metricName(metric);

    Gauge<Double> gauge = metric::value;

    _log.debug("Registering {}", name);
    try {
        _registry.register(name, gauge);
        _metricNames.add(name);
    } catch (IllegalArgumentException e) {
        _log.debug("metricChange called for `{}' which was already registered, ignoring.", name);
    }
}
 
源代码21 项目: emodb   文件: DropwizardMetricsReporter.java
@Override
public void metricRemoval(KafkaMetric metric) {
    String name = metricName(metric);
    _log.debug("Removing {}", name);
    _registry.remove(name);
    _metricNames.remove(name);
}
 
@Override
public void init(List<KafkaMetric> list) {
    if (config == null) {
        throw new IllegalStateException("Must call configure() before calling init() on a reporter.");
    }
    String registryName = config.getString(DropwizardReporterConfig.REGISTRY_PROPERTY_NAME);
    this.registry = SharedMetricRegistries.getOrCreate(registryName);
    for (KafkaMetric kafkaMetric : list) {
        this.metricChange(kafkaMetric);
    }
}
 
@Override
public void metricRemoval(KafkaMetric kafkaMetric) {
    String name = dropwizardMetricName(kafkaMetric);
    LOGGER.debug("Removing {}", name);
    registry.remove(name);
    metricNames.remove(name);
}
 
@Override
public void init(List<KafkaMetric> list) {
  super.init(list);
  InetSocketAddress address = new InetSocketAddress(
      config.getString(DropwizardReporterConfig.GRAPHITE_HOST_PROPERTY_NAME),
      config.getInt(DropwizardReporterConfig.GRAPHITE_PORT_PROPERTY_NAME));
  graphite = new Graphite(address);
  reporter = GraphiteReporter.forRegistry(registry)
      .prefixedWith(config.getString(DropwizardReporterConfig.GRAPHITE_PREFIX_PROPERTY_NAME))
      .build(graphite);
  LOGGER.info("Starting the reporter");
  reporter.start(11, TimeUnit.SECONDS);
}
 
源代码25 项目: kafka-metrics   文件: KafkaMetricsProcessor.java
public KafkaMetricsProcessor(
        MetricsRegistry metricsRegistry,
        Map<org.apache.kafka.common.MetricName, KafkaMetric> kafkaMetrics,
        MeasurementPublisher publisher,
        Map<String, String> fixedTags,
        Integer pollingIntervalSeconds
) {
    super(metricsRegistry, "streaming-reporter");
    this.kafkaMetrics = kafkaMetrics;
    this.clock = Clock.defaultClock();
    this.fixedTags = fixedTags;
    this.publisher = publisher;
    this.formatter = new MeasurementFormatter();
    this.pollingIntervalSeconds = pollingIntervalSeconds;
}
 
源代码26 项目: incubator-gobblin   文件: Kafka09ConsumerClient.java
@Override
public Map<String, Metric> getMetrics() {
  Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
  Map<String, Metric> codaHaleMetricMap = new HashMap<>();

  kafkaMetrics
      .forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
  return codaHaleMetricMap;
}
 
源代码27 项目: incubator-gobblin   文件: Kafka09ConsumerClient.java
/**
 * Convert a {@link KafkaMetric} instance to a {@link Metric}.
 * @param kafkaMetric
 * @return
 */
private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
  if (log.isDebugEnabled()) {
    log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
  }
  Gauge<Double> gauge = () -> kafkaMetric.value();
  return gauge;
}
 
@Override
public void init(List<KafkaMetric> metrics) {
  registry = new StatsDMetricsRegistry();
  kafkaMetrics = new HashMap<String, KafkaMetric>();

  if (enabled) {
    startReporter(POLLING_PERIOD_IN_SECONDS);
  } else {
    log.warn("KafkaStatsDReporter is disabled");
  }

  for (KafkaMetric metric : metrics) {
    metricChange(metric);
  }
}
 
@Test
public void init_should_start_reporter_when_enabled() {
  configs.put(StatsdMetricsReporter.STATSD_REPORTER_ENABLED, "true");
  StatsdMetricsReporter reporter = new StatsdMetricsReporter();
  assertFalse("reporter should not be running", reporter.isRunning());
  reporter.configure(configs);
  reporter.init(new ArrayList<KafkaMetric>());
  assertTrue("reporter should be running once #init has been invoked", reporter.isRunning());
}
 
@Test
public void init_should_not_start_reporter_when_disabled() {
  configs.put(StatsdMetricsReporter.STATSD_REPORTER_ENABLED, "false");
  StatsdMetricsReporter reporter = new StatsdMetricsReporter();
  assertFalse("reporter should not be running", reporter.isRunning());
  reporter.configure(configs);
  reporter.init(new ArrayList<KafkaMetric>());
  assertFalse("reporter should NOT be running once #init has been invoked", reporter.isRunning());
}
 
 类所在包
 类方法
 同包方法