下面列出了怎么用org.apache.kafka.common.MetricName的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings({ "deprecation" })
protected void publishKafkaProducerMetricsToOstrich() {
Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> producers = KafkaProducerManager
.getInstance().getProducers();
for (Entry<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> kafkaProducerEntry : producers
.entrySet()) {
KafkaProducerConfig key = kafkaProducerEntry.getKey();
String signature = convertSignatureToTag(key);
Map<MetricName, ? extends Metric> metrics = kafkaProducerEntry.getValue().metrics();
for (Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (PRODUCER_METRICS_WHITELIST.contains(entry.getKey().name())) {
OpenTsdbMetricConverter.gauge("kafkaproducer." + entry.getKey().name(),
entry.getValue().value(), "cluster=" + signature);
}
}
}
}
private void ensureMetricsCreated(ConnectorTaskId taskId) {
Map<String, String> tags = getTaskLevelTags(taskId);
MetricName taskMetric =
getMetric(
FAILED_TASK_ATTEMPTS_METRIC_NAME + "-count",
TASK_CONNECTOR_JMX_GROUP_NAME,
"count of restart attempts to a failed task",
taskLevelJmxTags,
tags);
if (!metrics.metrics().containsKey(taskMetric)) {
Sensor sensor = getSensor(taskId.toString());
sensor.add(taskMetric, new Total());
logger.info("Added the task {} to the list of JMX metrics", taskId);
logger.debug("Updated set of JMX metrics is {}", metrics.metrics());
}
}
/**
* 为每个partition添加Sensor
*
* @param partition
*/
public void addPartitionSensor(int partition) {
try {
Sensor recordsProducedSensor = metrics.sensor("records-produced-partition-" + partition);
recordsProducedSensor.add(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME,
"The average number of records per second that are produced to this partition", tags), new Rate());
_recordsProducedPerPartition.put(partition, recordsProducedSensor);
Sensor errorsSensor = metrics.sensor("produce-error-partition-" + partition);
errorsSensor.add(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME,
"The average number of errors per second when producing to this partition", tags), new Rate());
_produceErrorPerPartition.put(partition, errorsSensor);
} catch (Exception e) {
logger.error("addPartitionSensor exception {}", e);
}
}
@Override
public void configure(Map<String, ?> configs) {
scheduler = Executors.newSingleThreadScheduledExecutor(HermesThreadFactory.create("KafkaMetricsLogger", true));
ClientEnvironment env = PlexusComponentLocator.lookup(ClientEnvironment.class);
int interval = 60;
Properties globalConfig = env.getGlobalConfig();
if (globalConfig.containsKey("metric.reporters.interval.second")) {
interval = Integer.parseInt(globalConfig.getProperty("metric.reporters.interval.second"));
}
long millis = TimeUnit.SECONDS.toMillis(interval);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<MetricName, KafkaMetric> e : metrics.entrySet()) {
m_logger.info("{} : {}", getMetricKey(e.getKey()), e.getValue().value());
}
}
}, millis, millis, TimeUnit.MILLISECONDS);
}
/**
* Define common tags and meters before binding metrics
*/
void prepareToBindMetrics(MeterRegistry registry) {
Map<MetricName, ? extends Metric> metrics = metricsSupplier.get();
// Collect static metrics and tags
Metric startTime = null;
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
if (clientId.equals(DEFAULT_VALUE) && name.tags().get(CLIENT_ID_TAG_NAME) != null)
clientId = name.tags().get(CLIENT_ID_TAG_NAME);
if (METRIC_GROUP_APP_INFO.equals(name.group()))
if (VERSION_METRIC_NAME.equals(name.name())) {
kafkaVersion = (String) entry.getValue().metricValue();
} else if (START_TIME_METRIC_NAME.equals(name.name())) {
startTime = entry.getValue();
}
}
if (startTime != null) {
bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
}
}
@Test
void shouldKeepMetersWhenMetricsDoNotChange() {
//Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
}
@Test
void shouldNotAddAppInfoMetrics() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
MetricName metricName = new MetricName("a0", "b0", "c0", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(metricName, metric);
MetricName appInfoMetricName =
new MetricName("a1", KafkaMetrics.METRIC_GROUP_APP_INFO, "c0",
new LinkedHashMap<>());
KafkaMetric appInfoMetric =
new KafkaMetric(this, appInfoMetricName, new Value(), new MetricConfig(), Time.SYSTEM);
metrics.put(appInfoMetricName, appInfoMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
}
@Test
void shouldRemoveOlderMeterWithLessTags() {
Map<String, String> tags = new LinkedHashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", tags);
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(1); //only version
tags.put("key0", "value0");
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2);
}
@Test
void shouldRemoveMeterWithLessTags() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName firstName = new MetricName("a", "b", "c", Collections.emptyMap());
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<String, String> tags = new LinkedHashMap<>();
tags.put("key0", "value0");
MetricName secondName = new MetricName("a", "b", "c", tags);
KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
metrics.put(firstName, firstMetric);
metrics.put(secondName, secondMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
@Test
void shouldBindMetersWithSameTags() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<String, String> firstTags = new LinkedHashMap<>();
firstTags.put("key0", "value0");
MetricName firstName = new MetricName("a", "b", "c", firstTags);
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<String, String> secondTags = new LinkedHashMap<>();
secondTags.put("key0", "value1");
MetricName secondName = new MetricName("a", "b", "c", secondTags);
KafkaMetric secondMetric = new KafkaMetric(this, secondName, new Value(), new MetricConfig(), Time.SYSTEM);
Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
metrics.put(firstName, firstMetric);
metrics.put(secondName, secondMetric);
return metrics;
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(2);
assertThat(registry.getMeters().get(0).getId().getTags()).hasSize(2); // version + key0
}
@Issue("#1968")
@Test
void shouldBindMetersWithDifferentClientIds() {
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
Map<String, String> firstTags = new LinkedHashMap<>();
firstTags.put("key0", "value0");
firstTags.put("client-id", "client0");
MetricName firstName = new MetricName("a", "b", "c", firstTags);
KafkaMetric firstMetric = new KafkaMetric(this, firstName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(firstName, firstMetric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
registry.counter("kafka.b.a", "client-id", "client1", "key0", "value0");
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(2);
}
@Issue("#1968")
@Test
void shouldRemoveOlderMeterWithLessTagsWhenCommonTagsConfigured() {
//Given
Map<String, String> tags = new LinkedHashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", tags);
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
registry.config().commonTags("common", "value");
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("common", "value")); // only version
tags.put("key0", "value0");
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(Tag.of("kafka-version", "unknown"), Tag.of("key0", "value0"), Tag.of("common", "value"));
}
/**
* kafka doesnt have an API for getting the client id from a client (WTH?!)
* relying on reflection is tricky because we may be dealing with various
* wrappers/decorators, but it does leak through kafka's metrics tags ...
* @param metrics kafka client metrics
* @return best guess for the client id
*/
private static String fishForClientId(Map<MetricName, ? extends Metric> metrics) {
Set<String> candidates = new HashSet<>();
metrics.forEach((metricName, metric) -> {
Map<String, String> tags = metricName.tags();
if (tags == null) {
return;
}
String clientId = tags.get("client-id");
if (clientId != null) {
candidates.add(clientId);
}
});
if (candidates.isEmpty()) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalArgumentException("ambiguous client id from client: " + candidates);
}
return candidates.iterator().next();
}
@Override
public synchronized void start() {
if (_running.compareAndSet(false, true)) {
_consumeThread.start();
LOG.info("{}/ConsumeService started.", _name);
Sensor topicPartitionCount = metrics.sensor("topic-partitions");
DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singleton(_topic));
Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(_topic);
TopicDescription topicDescription = null;
try {
topicDescription = topicDescriptionKafkaFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {}", _topic, e);
}
@SuppressWarnings("ConstantConditions")
double partitionCount = topicDescription.partitions().size();
topicPartitionCount.add(
new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), new Total(partitionCount));
}
}
/**
* Metrics for Calculating the offset commit latency of a consumer.
* @param metrics the commit offset metrics
* @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
*/
public CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs,
int latencyPercentileGranularityMs) {
_inProgressCommit = false;
_commitOffsetLatency = metrics.sensor("commit-offset-latency");
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-avg", METRIC_GROUP_NAME, "The average latency in ms of committing offset", tags), new Avg());
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-max", METRIC_GROUP_NAME, "The maximum latency in ms of committing offset", tags), new Max());
if (latencyPercentileGranularityMs == 0) {
throw new IllegalArgumentException("The latency percentile granularity was incorrectly passed a zero value.");
}
// 2 extra buckets exist which are respectively designated for values which are less than 0.0 or larger than max.
int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
int sizeInBytes = bucketNum * 4;
_commitOffsetLatency.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
new Percentile(new MetricName("commit-offset-latency-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of committing offset", tags), 99.0),
new Percentile(new MetricName("commit-offset-latency-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of committing offset", tags), 99.9),
new Percentile(new MetricName("commit-offset-latency-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of committing offset", tags), 99.99)));
LOG.info("{} was constructed successfully.", this.getClass().getSimpleName());
}
/**
* Metrics for Calculating the offset commit availability of a consumer.
* @param metrics the commit offset metrics
* @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
*/
public CommitAvailabilityMetrics(final Metrics metrics, final Map<String, String> tags) {
LOG.info("{} called.", this.getClass().getSimpleName());
_offsetsCommitted = metrics.sensor("offsets-committed");
_offsetsCommitted.add(new MetricName("offsets-committed-total", METRIC_GROUP_NAME,
"The total number of offsets per second that are committed.", tags), new Total());
_failedCommitOffsets = metrics.sensor("failed-commit-offsets");
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-avg", METRIC_GROUP_NAME,
"The average number of offsets per second that have failed.", tags), new Rate());
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-total", METRIC_GROUP_NAME,
"The total number of offsets per second that have failed.", tags), new Total());
metrics.addMetric(new MetricName("offsets-committed-avg", METRIC_GROUP_NAME, "The average offset commits availability.", tags),
(MetricConfig config, long now) -> {
Object offsetCommitTotal = metrics.metrics().get(metrics.metricName("offsets-committed-total", METRIC_GROUP_NAME, tags)).metricValue();
Object offsetCommitFailTotal = metrics.metrics().get(metrics.metricName("failed-commit-offsets-total", METRIC_GROUP_NAME, tags)).metricValue();
if (offsetCommitTotal != null && offsetCommitFailTotal != null) {
double offsetsCommittedCount = (double) offsetCommitTotal;
double offsetsCommittedErrorCount = (double) offsetCommitFailTotal;
return offsetsCommittedCount / (offsetsCommittedCount + offsetsCommittedErrorCount);
} else {
return 0;
}
});
}
/**
*
* @param metrics a named, numerical measurement. sensor is a handle to record numerical measurements as they occur.
* @param tags metrics/sensor's tags
*/
public ClusterTopicManipulationMetrics(final Metrics metrics, final Map<String, String> tags) {
super(metrics, tags);
_topicCreationSensor = metrics.sensor("topic-creation-metadata-propagation");
_topicDeletionSensor = metrics.sensor("topic-deletion-metadata-propagation");
_topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
"The average propagation duration in ms of propagating topic creation data and metadata to all brokers in the cluster",
tags), new Avg());
_topicCreationSensor.add(new MetricName("topic-creation-metadata-propagation-ms-max", METRIC_GROUP_NAME,
"The maximum propagation time in ms of propagating topic creation data and metadata to all brokers in the cluster",
tags), new Max());
_topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-avg", METRIC_GROUP_NAME,
"The average propagation duration in milliseconds of propagating the topic deletion data and metadata "
+ "across all the brokers in the cluster.", tags), new Avg());
_topicDeletionSensor.add(new MetricName("topic-deletion-metadata-propagation-ms-max", METRIC_GROUP_NAME,
"The maximum propagation time in milliseconds of propagating the topic deletion data and metadata "
+ "across all the brokers in the cluster.", tags), new Max());
LOGGER.debug("{} constructor was initialized successfully.", "ClusterTopicManipulationMetrics");
}
private static String dropwizardMetricName(KafkaMetric kafkaMetric) {
MetricName name = kafkaMetric.metricName();
List<String> nameParts = new ArrayList<String>(2);
nameParts.add(name.group());
nameParts.addAll(name.tags().values());
nameParts.add(name.name());
StringBuilder builder = new StringBuilder();
for (String namePart : nameParts) {
builder.append(namePart);
builder.append(".");
}
builder.setLength(builder.length() - 1); // Remove the trailing dot.
String processedName = builder.toString().replace(' ', '_').replace("\\.", "_");
return MetricRegistry.name(METRIC_PREFIX, processedName);
}
@Test
public void testMetricChange() throws Exception {
Metrics metrics = new Metrics();
DropwizardReporter reporter = new DropwizardReporter();
reporter.configure(new HashMap<String, Object>());
metrics.addReporter(reporter);
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
Map<String, Gauge> gauges = SharedMetricRegistries.getOrCreate("default").getGauges();
String expectedName = "org.apache.kafka.common.metrics.grp1.pack.bean1.avg";
Assert.assertEquals(1, gauges.size());
Assert.assertEquals(expectedName, gauges.keySet().toArray()[0]);
sensor.record(2.1);
sensor.record(2.2);
sensor.record(2.6);
Assert.assertEquals(2.3, (Double)gauges.get(expectedName).getValue(), 0.001);
}
@Test
public final void sendDoubleGauge() throws Exception {
final double value = 10.11;
Metric metric = new Metric() {
@Override
public MetricName metricName() {
return new MetricName("test-metric", "group");
}
@Override
public double value() {
return value;
}
};
addMetricAndRunReporter("foo", metric, "bar");
verify(statsD).gauge(Matchers.eq("foo"), Matchers.eq(value), Matchers.eq("bar"));
}
@Override
public long checkPause() {
if (blockOnBufferFull) {
return 0; // do not pause here, will be blocked
} else {
//producer.metrics().get(new MetricName("buffer-total-bytes", "producer-metrics", "desc", "client-id", "kafkasink"))
double totalBytes = producer.metrics().get(
new MetricName(
"buffer-total-bytes",
"producer-metrics",
"desc",
"client-id",
props.getProperty("client.id"))).value();
double availableBytes = producer.metrics().get(
new MetricName(
"buffer-available-bytes",
"producer-metrics",
"desc",
"client-id",
props.getProperty("client.id"))).value();
double consumedMemory = totalBytes - availableBytes;
double memoryRate = consumedMemory / totalBytes;
if (memoryRate >= 0.5) {
double outgoingRate = producer.metrics().get(
new MetricName(
"outgoing-byte-rate",
"producer-metrics",
"desc",
"client-id",
props.getProperty("client.id"))).value();
double throughputRate = Math.max(outgoingRate, 1.0);
return (long) (consumedMemory / throughputRate * 1000);
} else {
return 0;
}
}
}
protected void checkOtherApis(Producer<String, String> producer) {
topics.forEach(t -> {
List<PartitionInfo> partitions = producer.partitionsFor(t);
Assert.assertNotNull(partitions);
Assert.assertEquals(1, partitions.size());
});
Map<MetricName, ?> metrics = producer.metrics();
System.out.println("metrics: " + metrics);
Assert.assertFalse(CollectionExtension.isEmpty(metrics));
}
private void addMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName();
MonitorConfig.Builder builder = MonitorConfig.builder(metricName.name())
.withTag("group", metricName.group());
for(Map.Entry<String, String> tag : metricName.tags().entrySet()) {
builder.withTag(tag.getKey(), tag.getValue());
}
MonitorConfig monitorConfig = builder.build();
gauges.put(Servo.getDoubleGauge(monitorConfig), metric);
}
private Function<MetricName, List<Tag>> getTagFunction() {
return metricName -> metricName
.tags()
.entrySet()
.stream()
.filter(entry -> getIncludedTags().contains(entry.getKey()))
.map(entry -> Tag.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
MissingPartitionsJmxReporter(Metrics metrics) {
super(metrics);
Sensor missingPartsSensor = metrics.sensor(MISSING_DEST_PARTITIONS);
MetricName missingPartsName = metrics.metricName(MISSING_DEST_PARTITIONS + "-count", "mirus");
missingPartsSensor.add(missingPartsName, new Value());
this.missingPartsSensor = missingPartsSensor;
}
private void assertFailedMetricCount(String state, int task, Double expected) {
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, task);
TaskState taskState = new TaskState(task, state, "worker1", "Test");
taskJmxReporter.updateMetrics(taskId, taskState);
HashMap<String, String> tags = new HashMap<>();
tags.put("connector", CONNECTOR_NAME);
tags.put("task", Integer.toString(task));
assertEquals(
expected,
metrics
.metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
.metricValue());
}
@Test
public void testIncrementTotalFailedCount() {
assertEquals(
0.0d,
metrics
.metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
.metricValue());
connectorJmxReporter.incrementTotalFailedCount(CONNECTOR_NAME);
assertEquals(
1.0d,
metrics
.metric(new MetricName("task-failed-restart-attempts-count", GROUP, "", tags))
.metricValue());
}
@Test
public void testIncrementConnectorRestartAttempts() {
assertEquals(
0.0d,
metrics
.metric(new MetricName("connector-failed-restart-attempts-count", GROUP, "", tags))
.metricValue());
connectorJmxReporter.incrementConnectorRestartAttempts(CONNECTOR_NAME);
assertEquals(
1.0d,
metrics
.metric(new MetricName("connector-failed-restart-attempts-count", GROUP, "", tags))
.metricValue());
}
public ProduceMetrics(Metrics metrics, final Map<String, String> tags) {
this.metrics = metrics;
this.tags = tags;
_recordsProducedPerPartition = new ConcurrentHashMap<>();
_produceErrorPerPartition = new ConcurrentHashMap<>();
recordsProduce = metrics.sensor("records-produced");
recordsProduce.add(new MetricName("records-produced-total", METRIC_GROUP_NAME, "The total number of records that are produced", tags), new Total());
errorProduce = metrics.sensor("error-produce");
errorProduce.add(new MetricName("error-produce-total", METRIC_GROUP_NAME, "", tags), new Total());
metrics.addMetric(new MetricName("produce-availability-avg", METRIC_GROUP_NAME, "The average produce availability", tags),
(config, now) -> {
double availabilitySum = 0.0;
//可用性等于每个partition的可用性之和除以partition总数
//partition可用性等于成功发送率除以失败率
int num = partitionNum.get();
for (int partition = 0; partition < num; partition++) {
double recordsProduced = produceMetrics.metrics.metrics().get(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME, tags)).value();
double produceError = produceMetrics.metrics.metrics().get(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME, tags)).value();
if (Double.isNaN(produceError) || Double.isInfinite(produceError)) {
produceError = 0;
}
if (recordsProduced + produceError > 0) {
availabilitySum += recordsProduced / (recordsProduced + produceError);
}
}
return availabilitySum / num;
//return 0;
});
}
@Override
public Map<String, Metric> getMetrics() {
Map<MetricName, KafkaMetric> kafkaMetrics = (Map<MetricName, KafkaMetric>) this.consumer.metrics();
Map<String, Metric> codaHaleMetricMap = new HashMap<>();
kafkaMetrics
.forEach((key, value) -> codaHaleMetricMap.put(canonicalMetricName(value), kafkaToCodaHaleMetric(value)));
return codaHaleMetricMap;
}