下面列出了怎么用com.codahale.metrics.SlidingWindowReservoir的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Tests the histogram functionality of the DropwizardHistogramWrapper.
*/
@Test
public void testDropwizardHistogramWrapper() {
int size = 10;
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
for (int i = 0; i < size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(0, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals((size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
for (int i = size; i < 2 * size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals(size + (size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
}
/**
* Tests the histogram functionality of the DropwizardHistogramWrapper.
*/
@Test
public void testDropwizardHistogramWrapper() {
int size = 10;
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
for (int i = 0; i < size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(0, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals((size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
for (int i = size; i < 2 * size; i++) {
histogramWrapper.update(i);
assertEquals(i + 1, histogramWrapper.getCount());
assertEquals(i, histogramWrapper.getStatistics().getMax());
assertEquals(i + 1 - size, histogramWrapper.getStatistics().getMin());
}
assertEquals(size, histogramWrapper.getStatistics().size());
assertEquals(size + (size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
}
@Test
public void shouldReportHistogramSubsequentSnapshotValues_SumMaxMinValues() throws Exception {
CloudWatchReporter reporter = reporterBuilder.withStatisticSet().build();
final Histogram slidingWindowHistogram = new Histogram(new SlidingWindowReservoir(4));
metricRegistry.register("SlidingWindowHistogram", slidingWindowHistogram);
slidingWindowHistogram.update(1);
slidingWindowHistogram.update(2);
slidingWindowHistogram.update(30);
reporter.report();
final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY);
assertThat(metricData.statisticValues().maximum().intValue()).isEqualTo(30);
assertThat(metricData.statisticValues().minimum().intValue()).isEqualTo(1);
assertThat(metricData.statisticValues().sampleCount().intValue()).isEqualTo(3);
assertThat(metricData.statisticValues().sum().intValue()).isEqualTo(33);
assertThat(metricData.unit()).isEqualTo(StandardUnit.NONE);
slidingWindowHistogram.update(4);
slidingWindowHistogram.update(100);
slidingWindowHistogram.update(5);
slidingWindowHistogram.update(6);
reporter.report();
final MetricDatum secondMetricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY);
assertThat(secondMetricData.statisticValues().maximum().intValue()).isEqualTo(100);
assertThat(secondMetricData.statisticValues().minimum().intValue()).isEqualTo(4);
assertThat(secondMetricData.statisticValues().sampleCount().intValue()).isEqualTo(4);
assertThat(secondMetricData.statisticValues().sum().intValue()).isEqualTo(115);
assertThat(secondMetricData.unit()).isEqualTo(StandardUnit.NONE);
}
/**
* Tests the histogram functionality of the DropwizardHistogramWrapper.
*/
@Test
public void testDropwizardHistogramWrapper() {
int size = 10;
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
testHistogram(size, histogramWrapper);
}
public Histogram getSlidingWindowHistogram(MetricRegistry registry, String name, final int size) {
return getOrAdd(registry, name, new MetricBuilder<Histogram>() {
@Override
public Histogram newMetric() {
return new Histogram(new SlidingWindowReservoir(size));
}
@Override
public boolean isInstance(Metric metric) {
return Histogram.class.isInstance(metric);
}
});
}
public Timer getSlidingWindowTimer(MetricRegistry registry, String name, final int size) {
return getOrAdd(registry, name, new MetricBuilder<Timer>() {
@Override
public Timer newMetric() {
return new Timer(new SlidingWindowReservoir(size));
}
@Override
public boolean isInstance(Metric metric) {
return Timer.class.isInstance(metric);
}
});
}
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.setParallelism(1);
env.addSource(new SourceFunction<Long>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> out) throws Exception {
while (isRunning) {
out.collect(Long.valueOf(Math.round(Math.random() * 100)));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).map(new RichMapFunction<Long, Long>() {
Histogram histogram;
int index;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
index = getRuntimeContext().getIndexOfThisSubtask() + 1;
histogram = getRuntimeContext().getMetricGroup()
.addGroup("flink-metrics-test")
.histogram("histogramTest", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long s) throws Exception {
histogram.update(s);
System.out.println("index = " + " count = " + histogram.getCount() + " max= " + histogram.getStatistics().getMax() + " min = " + histogram.getStatistics().getMin() + " mean = " + histogram.getStatistics().getMean() + " 75% = " + histogram.getStatistics().getQuantile(0.75));
return s;
}
}).print();
env.execute("Flink custom Histogram Metrics");
}
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.setParallelism(1);
env.addSource(new SourceFunction<Long>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> out) throws Exception {
while (isRunning) {
out.collect(Long.valueOf(Math.round(Math.random() * 100)));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).map(new RichMapFunction<Long, Long>() {
Histogram histogram;
int index;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
index = getRuntimeContext().getIndexOfThisSubtask() + 1;
histogram = getRuntimeContext().getMetricGroup()
.addGroup("flink-metrics-test")
.histogram("histogramTest", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long s) throws Exception {
histogram.update(s);
System.out.println("index = " + " count = " + histogram.getCount() + " max= " + histogram.getStatistics().getMax() + " min = " + histogram.getStatistics().getMin() + " mean = " + histogram.getStatistics().getMean() + " 75% = " + histogram.getStatistics().getQuantile(0.75));
return s;
}
}).print();
env.execute("Flink custom Histogram Metrics");
}
private static Histogram histogram(int update) {
Histogram okHistogram = new Histogram(new SlidingWindowReservoir(50));
okHistogram.update(update);
return okHistogram;
}
/**
* Update the broker stats. Note that a broker may continue to send brokerStats that contains
* failure info after the kafka process fails.
*
* @param brokerStats the broker stats
*/
public void recordBrokerStats(BrokerStats brokerStats) {
try {
int brokerId = brokerStats.getId();
LinkedList<BrokerStats> brokerStatsList = brokerStatsMap.computeIfAbsent(brokerId, i -> new LinkedList<>());
// multiple PastReplicaStatsProcessor/BrokerStatsProcessor may be processing BrokerStats
// for the same broker simultaneously, thus enforcing single writes here
synchronized (brokerStatsList){
if (brokerStatsList.size() == MAX_NUM_STATS) {
brokerStatsList.removeFirst();
}
brokerStatsList.addLast(brokerStats);
}
if (!brokerStats.getHasFailure()) {
// only record brokerstat when there is no failure on that broker.
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, this, i));
broker.update(brokerStats);
}
if (brokerStats.getLeaderReplicaStats() != null) {
for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
String topic = replicaStat.getTopic();
TopicPartition topicPartition = new TopicPartition(topic, replicaStat.getPartition());
topicPartitions.computeIfAbsent(topic, t -> new HashSet<>()).add(topicPartition);
// if the replica is involved in reassignment, ignore the stats
if (replicaStat.getInReassignment()){
reassignmentTimestamps.compute(topicPartition,
(t, v) -> v == null || v < replicaStat.getTimestamp() ? replicaStat.getTimestamp() : v);
continue;
}
long lastReassignment = reassignmentTimestamps.getOrDefault(topicPartition, 0L);
if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
continue;
}
bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate());
bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate());
}
}
} catch (Exception e) {
LOG.error("Failed to read broker stats : {}", brokerStats, e);
}
}
@SuppressWarnings({"unchecked"})
private static final Reservoir getReservoir(SolrResourceLoader loader, PluginInfo info) {
if (info == null) {
return new ExponentiallyDecayingReservoir();
}
Clock clk = getClock(info, CLOCK);
String clazz = ExponentiallyDecayingReservoir.class.getName();
int size = -1;
double alpha = -1;
long window = -1;
if (info.initArgs != null) {
if (info.initArgs.get(RESERVOIR) != null) {
String val = String.valueOf(info.initArgs.get(RESERVOIR)).trim();
if (!val.isEmpty()) {
clazz = val;
}
}
Number n = (Number)info.initArgs.get(RESERVOIR_SIZE);
if (n != null) {
size = n.intValue();
}
n = (Number)info.initArgs.get(RESERVOIR_EDR_ALPHA);
if (n != null) {
alpha = n.doubleValue();
}
n = (Number)info.initArgs.get(RESERVOIR_WINDOW);
if (n != null) {
window = n.longValue();
}
}
if (size <= 0) {
size = DEFAULT_SIZE;
}
if (alpha <= 0) {
alpha = DEFAULT_ALPHA;
}
// special case for core implementations
if (clazz.equals(EDR_CLAZZ)) {
return new ExponentiallyDecayingReservoir(size, alpha, clk);
} else if (clazz.equals(UNI_CLAZZ)) {
return new UniformReservoir(size);
} else if (clazz.equals(STW_CLAZZ)) {
if (window <= 0) {
window = DEFAULT_WINDOW; // 5 minutes, comparable to EDR
}
return new SlidingTimeWindowReservoir(window, TimeUnit.SECONDS);
} else if (clazz.equals(SW_CLAZZ)) {
return new SlidingWindowReservoir(size);
} else { // custom reservoir
Reservoir reservoir;
try {
reservoir = loader.newInstance(clazz, Reservoir.class);
if (reservoir instanceof PluginInfoInitialized) {
((PluginInfoInitialized)reservoir).init(info);
} else {
SolrPluginUtils.invokeSetters(reservoir, info.initArgs, true);
}
return reservoir;
} catch (Exception e) {
log.warn("Error initializing custom Reservoir implementation (will use default): {}", info, e);
return new ExponentiallyDecayingReservoir(size, alpha, clk);
}
}
}