下面列出了怎么用org.apache.kafka.common.metrics.KafkaMetric的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void configure(Map<String, ?> configs) {
scheduler = Executors.newSingleThreadScheduledExecutor(HermesThreadFactory.create("KafkaMetricsLogger", true));
ClientEnvironment env = PlexusComponentLocator.lookup(ClientEnvironment.class);
int interval = 60;
Properties globalConfig = env.getGlobalConfig();
if (globalConfig.containsKey("metric.reporters.interval.second")) {
interval = Integer.parseInt(globalConfig.getProperty("metric.reporters.interval.second"));
}
long millis = TimeUnit.SECONDS.toMillis(interval);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<MetricName, KafkaMetric> e : metrics.entrySet()) {
m_logger.info("{} : {}", getMetricKey(e.getKey()), e.getValue().value());
}
}
}, millis, millis, TimeUnit.MILLISECONDS);
}
/**
* Create a Cruise Control Metric.
*
* @param kafkaMetric Kafka metric name.
* @param now The current time in milliseconds.
* @param brokerId Broker Id.
* @return KafkaMetric converted as a CruiseControlMetric.
*/
public static CruiseControlMetric toCruiseControlMetric(KafkaMetric kafkaMetric, long now, int brokerId) {
org.apache.kafka.common.MetricName metricName = kafkaMetric.metricName();
if (!(kafkaMetric.metricValue() instanceof Double)) {
throw new IllegalArgumentException(String.format("Cannot convert non-double (%s) KafkaMetric %s to a Cruise Control"
+ " metric for broker %d", kafkaMetric.metricValue().getClass(),
kafkaMetric.metricName(), brokerId));
}
CruiseControlMetric ccm = toCruiseControlMetric(now, brokerId, metricName.name(), metricName.tags(), (double) kafkaMetric.metricValue());
if (ccm == null) {
throw new IllegalArgumentException(String.format("Cannot convert KafkaMetric %s to a Cruise Control metric for "
+ "broker %d at time %d", kafkaMetric.metricName(), brokerId, now));
}
return ccm;
}
@Test
void shouldKeepMetersWhenMetricsDoNotChange() {
//Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
}
@Test
void shouldNotAddAppInfoMetrics() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(metricName, metric);
MetricName appInfoMetricName =
new MetricName("a1", KafkaMetrics.METRIC_GROUP_APP_INFO, "c0",
new LinkedHashMap<>());
KafkaMetric appInfoMetric =
new KafkaMetric(this, appInfoMetricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(appInfoMetricName, appInfoMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
}
@Test
void shouldRemoveOlderMeterWithLessTags() {
Map<String, String> tags = new LinkedHashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", tags);
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(1); //only version
tags.put("key0", "value0");
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2);
}
@Test
void shouldRemoveMeterWithLessTags() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName firstName = new MetricName("a", "b", "c", Collections.emptyMap());
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<String, String> tags = new LinkedHashMap<>();
tags.put("key0", "value0");
MetricName secondName = new MetricName("a", "b", "c", tags);
KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
metrics.put(firstName, firstMetric);
metrics.put(secondName, secondMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
@Test
void shouldBindMetersWithSameTags() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<String, String> firstTags = new LinkedHashMap<>();
firstTags.put("key0", "value0");
MetricName firstName = new MetricName("a", "b", "c", firstTags);
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<String, String> secondTags = new LinkedHashMap<>();
secondTags.put("key0", "value1");
MetricName secondName = new MetricName("a", "b", "c", secondTags);
KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
metrics.put(firstName, firstMetric);
metrics.put(secondName, secondMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(2);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
@Issue("#1968")
@Test
void shouldBindMetersWithDifferentClientIds() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<String, String> firstTags = new LinkedHashMap<>();
firstTags.put("key0", "value0");
firstTags.put("client-id", "client0");
MetricName firstName = new MetricName("a", "b", "c", firstTags);
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(firstName, firstMetric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
registry.counter("kafka.b.a", "client-id", "client1", "key0", "value0");
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(2);
}
@Issue("#1968")
@Test
void shouldRemoveOlderMeterWithLessTagsWhenCommonTagsConfigured() {
//Given
Map<String, String> tags = new LinkedHashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", tags);
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
registry.config().commonTags("common", "value");
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("common", "value")); // only version
tags.put("key0", "value0");
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
}
@Override
public void metricChange(final KafkaMetric kafkaMetric) {
LOGGER.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
String name = dropwizardMetricName(kafkaMetric);
Gauge<Double> gauge = new Gauge<Double>() {
@Override
public Double getValue() {
return kafkaMetric.value();
}
};
LOGGER.debug("Registering {}", name);
try {
registry.register(name, gauge);
metricNames.add(name);
} catch (IllegalArgumentException e) {
LOGGER.debug("metricChange called for `{}' which was already registered, ignoring.", name);
}
}
private static String dropwizardMetricName(KafkaMetric kafkaMetric) {
MetricName name = kafkaMetric.metricName();
List<String> nameParts = new ArrayList<String>(2);
nameParts.add(name.group());
nameParts.addAll(name.tags().values());
nameParts.add(name.name());
StringBuilder builder = new StringBuilder();
for (String namePart : nameParts) {
builder.append(namePart);
builder.append(".");
}
builder.setLength(builder.length() - 1); // Remove the trailing dot.
String processedName = builder.toString().replace(' ', '_').replace("\\.", "_");
return MetricRegistry.name(METRIC_PREFIX, processedName);
}
@Override
public void metricChange(final KafkaMetric metric) {
String name = getMetricName(metric);
StringBuilder strBuilder = new StringBuilder();
for (String key : metric.metricName().tags().keySet()) {
strBuilder.append(key).append(":").append(metric.metricName().tags().get(key)).append(",");
}
if (strBuilder.length() > 0) {
strBuilder.deleteCharAt(strBuilder.length() - 1);
}
registry.register(name, metric, strBuilder.toString());
log.debug("metrics name: {}", name);
}
@Test
public void testExceptionMetrics() {
Response response = ClientBuilder.newClient(app.resourceConfig.getConfiguration())
.target("http://localhost:" + config.getInt(RestConfig.PORT_CONFIG))
.path("/private/exception")
.request(MediaType.APPLICATION_JSON_TYPE)
.get();
assertEquals(404, response.getStatus());
for (KafkaMetric metric: TestMetricsReporter.getMetricTimeseries()) {
if (metric.metricName().name().equals("request-error-rate")) {
if (metric.metricName().tags().getOrDefault(HTTP_STATUS_CODE_TAG, "").equals("4xx")) {
assertTrue("Actual: " + metric.value(),
metric.value() > 0);
} else if (!metric.metricName().tags().isEmpty()) {
assertTrue("Actual: " + metric.value() + metric.metricName(),
metric.value() == 0.0 || Double.isNaN(metric.value()));
}
}
}
}
@Override
public void init(List<KafkaMetric> metrics) {
this.metrics = metrics;
for (MeterRegistry meterRegistry : METER_REGISTRIES) {
for (KafkaMetric metric : metrics) {
registerMetric(meterRegistry, metric);
}
}
}
private void registerMetric(MeterRegistry meterRegistry, KafkaMetric metric) {
KafkaMetricMeterTypeBuilder.newBuilder()
.prefix(getMetricPrefix())
.metric(metric)
.tagFunction(getTagFunction())
.registry(meterRegistry)
.build();
}
@Override
public void init(List<KafkaMetric> metrics) {
for (KafkaMetric kafkaMetric : metrics) {
addMetricIfInterested(kafkaMetric);
}
LOG.info("Added {} Kafka metrics for Cruise Control metrics during initialization.", _interestedMetrics.size());
_metricsReporterRunner = new KafkaThread("CruiseControlMetricsReporterRunner", this, true);
_yammerMetricProcessor = new YammerMetricProcessor();
_metricsReporterRunner.start();
}
private void reportKafkaMetrics(long now) {
LOG.debug("Reporting KafkaMetrics. {}", _interestedMetrics.values());
for (KafkaMetric metric : _interestedMetrics.values()) {
sendCruiseControlMetric(MetricsUtils.toCruiseControlMetric(metric, now, _brokerId));
}
LOG.debug("Finished reporting KafkaMetrics.");
}
private void addMetricIfInterested(KafkaMetric metric) {
LOG.trace("Checking Kafka metric {}", metric.metricName());
if (MetricsUtils.isInterested(metric.metricName())) {
LOG.debug("Added new metric {} to Cruise Control metrics reporter.", metric.metricName());
_interestedMetrics.put(metric.metricName(), metric);
}
}
@Override
public void init(List<KafkaMetric> metrics) {
_registry = SharedMetricRegistries.getOrCreate(REGISTRY_NAME);
for (KafkaMetric kafkaMetric : metrics) {
metricChange(kafkaMetric);
}
}
@Override
public void metricChange(KafkaMetric metric) {
_log.debug("Processing a metric change for {}", metric.metricName());
String name = metricName(metric);
Gauge<Double> gauge = metric::value;
_log.debug("Registering {}", name);
try {
_registry.register(name, gauge);
_metricNames.add(name);
} catch (IllegalArgumentException e) {
_log.debug("metricChange called for `{}' which was already registered, ignoring.", name);
}
}
@Override
public void metricRemoval(KafkaMetric metric) {
String name = metricName(metric);
_log.debug("Removing {}", name);
_registry.remove(name);
_metricNames.remove(name);
}
@Override
public void init(List<KafkaMetric> list) {
if (config == null) {
throw new IllegalStateException("Must call configure() before calling init() on a reporter.");
}
String registryName = config.getString(DropwizardReporterConfig.REGISTRY_PROPERTY_NAME);
this.registry = SharedMetricRegistries.getOrCreate(registryName);
for (KafkaMetric kafkaMetric : list) {
this.metricChange(kafkaMetric);
}
}
@Override
public void metricRemoval(KafkaMetric kafkaMetric) {
String name = dropwizardMetricName(kafkaMetric);
LOGGER.debug("Removing {}", name);
registry.remove(name);
metricNames.remove(name);
}
@Override
public void init(List<KafkaMetric> list) {
super.init(list);
InetSocketAddress address = new InetSocketAddress(
config.getString(DropwizardReporterConfig.GRAPHITE_HOST_PROPERTY_NAME),
config.getInt(DropwizardReporterConfig.GRAPHITE_PORT_PROPERTY_NAME));
graphite = new Graphite(address);
reporter = GraphiteReporter.forRegistry(registry)
.prefixedWith(config.getString(DropwizardReporterConfig.GRAPHITE_PREFIX_PROPERTY_NAME))
.build(graphite);
LOGGER.info("Starting the reporter");
reporter.start(11, TimeUnit.SECONDS);
}
public KafkaMetricsProcessor(
MetricsRegistry metricsRegistry,
Map<org.apache.kafka.common.MetricName, KafkaMetric> kafkaMetrics,
MeasurementPublisher publisher,
Map<String, String> fixedTags,
Integer pollingIntervalSeconds
) {
super(metricsRegistry, "streaming-reporter");
this.kafkaMetrics = kafkaMetrics;
this.clock = Clock.defaultClock();
this.fixedTags = fixedTags;
this.publisher = publisher;
this.formatter = new MeasurementFormatter();
this.pollingIntervalSeconds = pollingIntervalSeconds;
}
@Override
public Map<String, Metric> getMetrics() {
Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
Map<String, Metric> codaHaleMetricMap = new HashMap<>();
kafkaMetrics
.forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
return codaHaleMetricMap;
}
/**
* Convert a {@link KafkaMetric} instance to a {@link Metric}.
* @param kafkaMetric
* @return
*/
private Metric kafkaToCodaHaleMetric(final KafkaMetric kafkaMetric) {
if (log.isDebugEnabled()) {
log.debug("Processing a metric change for {}", kafkaMetric.metricName().toString());
}
Gauge<Double> gauge = () -> kafkaMetric.value();
return gauge;
}
@Override
public void init(List<KafkaMetric> metrics) {
registry = new StatsDMetricsRegistry();
kafkaMetrics = new HashMap<String, KafkaMetric>();
if (enabled) {
startReporter(POLLING_PERIOD_IN_SECONDS);
} else {
log.warn("KafkaStatsDReporter is disabled");
}
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}
@Test
public void init_should_start_reporter_when_enabled() {
configs.put(StatsdMetricsReporter.STATSD_REPORTER_ENABLED, "true");
StatsdMetricsReporter reporter = new StatsdMetricsReporter();
assertFalse("reporter should not be running", reporter.isRunning());
reporter.configure(configs);
reporter.init(new ArrayList<KafkaMetric>());
assertTrue("reporter should be running once #init has been invoked", reporter.isRunning());
}
@Test
public void init_should_not_start_reporter_when_disabled() {
configs.put(StatsdMetricsReporter.STATSD_REPORTER_ENABLED, "false");
StatsdMetricsReporter reporter = new StatsdMetricsReporter();
assertFalse("reporter should not be running", reporter.isRunning());
reporter.configure(configs);
reporter.init(new ArrayList<KafkaMetric>());
assertFalse("reporter should NOT be running once #init has been invoked", reporter.isRunning());
}