下面列出了com.codahale.metrics.MetricRegistry#timer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Constructor
*
* @param client Riak client
* @param timeout Riak server-side timeout
* @param requestTimeout Riak client-side timeout
*/
public CursorStore(
final RiakClient client, final Duration timeout, final Duration requestTimeout) {
final MetricRegistry registry = SharedMetricRegistries.getOrCreate("default");
this.fetchTimer = registry.timer(MetricRegistry.name(CursorStore.class, "fetch"));
this.storeTimer = registry.timer(MetricRegistry.name(CursorStore.class, "store"));
this.deleteTimer = registry.timer(MetricRegistry.name(CursorStore.class, "delete"));
this.client = Objects.requireNonNull(client, "client == null");
this.timeout =
Optional.ofNullable(timeout)
.map(t -> Math.toIntExact(t.toMilliseconds()))
.orElse(DEFAULT_TIMEOUT_MS);
this.requestTimeout = Objects.requireNonNull(requestTimeout, "requestTimeout == null");
}
/**
* Creates a new detector manager from the given parameters.
*
* @param detectorSource DetectorSource
* @param dataInitializer DataInitializer collaborator
* @param config Config
* @param cachedDetectors Map containing cached detectors
* @param metricRegistry MetricRegistry collaborator
*/
public DetectorManager(DetectorSource detectorSource,
DataInitializer dataInitializer,
Config config,
Map<UUID, DetectorContainer> cachedDetectors,
MetricRegistry metricRegistry) {
// TODO: Seems odd to include this constructor, whose purpose seems to be to support unit testing.
// At least I don't think it should be public.
// This is conceptually just the base constructor with the cache exposed.[WLW]
this.cachedDetectors = cachedDetectors;
this.dataInitializer = dataInitializer;
this.detectorSource = detectorSource;
this.detectorRefreshTimePeriod = config.getInt(CK_DETECTOR_REFRESH_PERIOD);
this.detectorsLastUsedTimeToBeUpdatedQueue = new ConcurrentLinkedQueue<>();
this.metricRegistry = metricRegistry;
detectorForTimer = metricRegistry.timer("detector.detectorFor");
noDetectorFoundMeter = metricRegistry.meter("detector.nullDetector");
detectTimer = (name) -> metricRegistry.timer("detector." + name + ".detect");
this.initScheduler();
}
@Inject
public CassandraSearcher(CassandraSession session, @Named("newtsMetricRegistry") MetricRegistry registry, ContextConfigurations contextConfigurations) {
m_session = checkNotNull(session, "session argument");
m_searchTimer = registry.timer(name("search", "search"));
m_contextConfigurations = checkNotNull(contextConfigurations, "contextConfigurations argument");
Select select = QueryBuilder.select(Schema.C_TERMS_RESOURCE).from(Schema.T_TERMS);
select.where(eq(Schema.C_TERMS_CONTEXT, bindMarker(Schema.C_TERMS_CONTEXT)))
.and( eq(Schema.C_TERMS_FIELD, bindMarker(Schema.C_TERMS_FIELD)))
.and( eq(Schema.C_TERMS_VALUE, bindMarker(Schema.C_TERMS_VALUE)));
m_searchStatement = m_session.prepare(select.toString());
select = QueryBuilder.select(Schema.C_ATTRS_ATTR, Schema.C_ATTRS_VALUE).from(Schema.T_ATTRS);
select.where(eq(Schema.C_ATTRS_CONTEXT, bindMarker(Schema.C_ATTRS_CONTEXT)))
.and( eq(Schema.C_ATTRS_RESOURCE, bindMarker(Schema.C_ATTRS_RESOURCE)));
m_selectAttributesStatement = m_session.prepare(select.toString());
select = QueryBuilder.select(Schema.C_METRICS_NAME).from(Schema.T_METRICS);
select.where(eq(Schema.C_METRICS_CONTEXT, bindMarker(Schema.C_METRICS_CONTEXT)))
.and( eq(Schema.C_METRICS_RESOURCE, bindMarker(Schema.C_METRICS_RESOURCE)));
m_selectMetricNamesStatement = m_session.prepare(select.toString());
}
public VcrMetrics(MetricRegistry registry) {
this.registry = registry;
blobEncryptionCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionCount"));
blobDecryptionCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionCount"));
blobEncryptionErrorCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionErrorCount"));
blobDecryptionErrorCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionErrorCount"));
blobEncryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobEncryptionTime"));
blobDecryptionTime = registry.timer(MetricRegistry.name(CloudBlobStore.class, "BlobDecryptionTime"));
blobUploadSkippedCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobUploadSkippedCount"));
updateTtlNotSetError = registry.counter(MetricRegistry.name(CloudBlobStore.class, "UpdateTtlNotSetError"));
blobCacheLookupCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobCacheLookupCount"));
blobCacheHitCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "BlobCacheHitCount"));
retryCount = registry.counter(MetricRegistry.name(CloudBlobStore.class, "RetryCount"));
retryWaitTimeMsec = registry.counter(MetricRegistry.name(CloudBlobStore.class, "RetryWaitTimeMsec"));
blobCompactionRate = registry.meter(MetricRegistry.name(CloudStorageCompactor.class, "BlobCompactionRate"));
compactionFailureCount = registry.counter(MetricRegistry.name(CloudStorageCompactor.class, "CompactionFailureCount"));
compactionShutdownTimeoutCount =
registry.counter(MetricRegistry.name(CloudStorageCompactor.class, "CompactionShutdownTimeoutCount"));
addPartitionErrorCount =
registry.counter(MetricRegistry.name(VcrReplicationManager.class, "AddPartitionErrorCount"));
removePartitionErrorCount =
registry.counter(MetricRegistry.name(VcrReplicationManager.class, "RemovePartitionErrorCount"));
tokenReloadWarnCount = registry.counter(MetricRegistry.name(VcrReplicationManager.class, "TokenReloadWarnCount"));
}
public static void main(String[] args) throws Exception {
MetricRegistry metrics = new MetricRegistry();
ConsoleReporter reporter = PerformanceHelper.consoleReporter(metrics);
Timer timer = metrics.timer("writes");
TestFileHelper.createTestDirectory();
KeyValueGenerator keyValueGenerator = new KeyValueGenerator();
Value value = new Value(keyValueGenerator.testValue(100));
DBState state = ConfigGenerator.perfState();
TableWriter tableWriter = new TableWriter(state.config(), state.paths(), state.tables(), state.snapshots(),
state.caches(), new Metrics(state.config()));
for (int i = 0; i < RECORD_COUNT; i++) {
value.data().rewind();
Timer.Context watch = timer.time();
tableWriter.write(ByteBuffers.fromString(i + ""), value.data(), false);
watch.stop();
}
reporter.report();
tableWriter.close();
TestFileHelper.cleanUpTestFiles();
}
/**
* Constructor
*
* @param client Riak client
* @param idGenerator ID Generator
* @param cursors Cursor data store
* @param ruleStore Rule data store
* @param timeout Riak server-side timeout
* @param requestTimeout Riak client-side timeout
*/
public NotificationStore(
final RiakClient client,
final IdGenerator idGenerator,
final CursorStore cursors,
final RuleStore ruleStore,
final Duration timeout,
final Duration requestTimeout) {
final MetricRegistry registry = SharedMetricRegistries.getOrCreate("default");
this.fetchTimer = registry.timer(MetricRegistry.name(NotificationStore.class, "fetch"));
this.updateTimer = registry.timer(MetricRegistry.name(NotificationStore.class, "store"));
this.deleteTimer = registry.timer(MetricRegistry.name(NotificationStore.class, "delete"));
this.client = Objects.requireNonNull(client, "client == null");
this.idGenerator = Objects.requireNonNull(idGenerator, "idGenerator == null");
this.cursors = Objects.requireNonNull(cursors, "cursors == null");
this.ruleStore = Objects.requireNonNull(ruleStore, "ruleStore == null");
this.timeout =
Optional.ofNullable(timeout)
.map(t -> Math.toIntExact(t.toMilliseconds()))
.orElse(DEFAULT_TIMEOUT_MS);
this.requestTimeout = Objects.requireNonNull(requestTimeout, "requestTimeout == null");
}
/**
* Creates a new instance of the filter.
*
* @param context the bundle context
* @param configuration the application configuration
* @param registry the metric registry
*/
public HttpMetricFilter(BundleContext context, ApplicationConfiguration configuration,
MetricRegistry registry) {
this.context = context;
Map<Integer, String> meterNamesByStatusCode = createMeterNamesByStatusCode();
this.interceptionPattern = Pattern.compile(configuration.getWithDefault("monitor.http.interception", ".*"));
this.interceptionPriority = configuration.getIntegerWithDefault("monitor.http.priority", 10000);
this.metersByStatusCode = new ConcurrentHashMap<>(meterNamesByStatusCode
.size());
for (Map.Entry<Integer, String> entry : meterNamesByStatusCode.entrySet()) {
metersByStatusCode.put(entry.getKey(),
registry.meter("http.responseCodes." + entry.getValue()));
}
this.otherMeter = registry.meter("http.responseCodes.others");
this.activeRequests = registry.counter("http.activeRequests");
this.requestTimer = registry.timer("http.requests");
}
public LockManager(GossipManager gossipManager, final LockManagerSettings lockManagerSettings,
MetricRegistry metrics) {
this.gossipManager = gossipManager;
this.lockSettings = lockManagerSettings;
this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes());
this.lockKeys = new CopyOnWriteArraySet<>();
metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size);
lockTimeMetric = metrics.timer(LOCK_TIME);
// Register listener for lock keys
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (key.contains("lock/")) {
lockKeys.add(key);
}
});
voteService = Executors.newScheduledThreadPool(2);
voteService.scheduleAtFixedRate(this::updateVotes, 0, lockSettings.getVoteUpdateInterval(),
TimeUnit.MILLISECONDS);
}
@Test
public void testDropwizardTime() {
MetricRegistry reg = new MetricRegistry();
DropWizardMetricsRegistry dwreg = new DropWizardMetricsRegistry(Clock.SYSTEM, reg);
Timer timer = dwreg.timer(dwreg.createId("a.b.c"));
timer.record(123, TimeUnit.SECONDS);
com.codahale.metrics.Timer timer2 = reg.timer("a.b.c");
System.out.println(timer2.getCount());
Assert.assertEquals(1, timer2.getCount());
}
/**
* Create a metric fetcher manager.
*
* @param config The load monitor configurations.
* @param partitionMetricSampleAggregator The {@link KafkaPartitionMetricSampleAggregator} to aggregate partition metrics.
* @param brokerMetricSampleAggregator The {@link KafkaBrokerMetricSampleAggregator} to aggregate the broker metrics.
* @param metadataClient The metadata of the cluster.
* @param metricDef the metric definitions.
* @param time The time object.
* @param dropwizardMetricRegistry The Metric Registry object.
* @param brokerCapacityConfigResolver The resolver for retrieving broker capacities.
* @param sampler Metric fetcher or {@code null} to create one using {@link MonitorConfig#METRIC_SAMPLER_CLASS_CONFIG}.
*/
public MetricFetcherManager(KafkaCruiseControlConfig config,
KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator,
KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator,
MetadataClient metadataClient,
MetricDef metricDef,
Time time,
MetricRegistry dropwizardMetricRegistry,
BrokerCapacityConfigResolver brokerCapacityConfigResolver,
MetricSampler sampler) {
_time = time;
_partitionMetricSampleAggregator = partitionMetricSampleAggregator;
_brokerMetricSampleAggregator = brokerMetricSampleAggregator;
_metadataClient = metadataClient;
_metricDef = metricDef;
_samplingExecutor = Executors.newFixedThreadPool(SUPPORTED_NUM_METRIC_FETCHER,
new KafkaCruiseControlThreadFactory("MetricFetcher", true, LOG));
_partitionAssignor = config.getConfiguredInstance(MonitorConfig.METRIC_SAMPLER_PARTITION_ASSIGNOR_CLASS_CONFIG,
MetricSamplerPartitionAssignor.class);
_partitionAssignor.configure(config.mergedConfigValues());
_useLinearRegressionModel = config.getBoolean(MonitorConfig.USE_LINEAR_REGRESSION_MODEL_CONFIG);
_samplingFetcherTimer = dropwizardMetricRegistry.timer(MetricRegistry.name("MetricFetcherManager",
"partition-samples-fetcher-timer"));
_samplingFetcherFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name("MetricFetcherManager",
"partition-samples-fetcher-failure-rate"));
_trainingSamplesFetcherTimer = dropwizardMetricRegistry.timer(MetricRegistry.name("MetricFetcherManager",
"training-samples-fetcher-timer"));
_trainingSamplesFetcherFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name("MetricFetcherManager",
"training-samples-fetcher-failure-rate"));
_metricSampler = sampler == null
? config.getConfiguredInstance(MonitorConfig.METRIC_SAMPLER_CLASS_CONFIG, MetricSampler.class,
Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG,
brokerCapacityConfigResolver))
: sampler;
}
@Before
public void init() {
MetricRegistry metricRegistry = new MetricRegistry();
this.logReporter = ConsoleReporter
.forRegistry(metricRegistry)
.build();
logReporter.start(1, TimeUnit.MINUTES);
timer = metricRegistry.timer("connection");
}
@Inject
public AstyanaxStorageProvider(@BlobReadConsistency ConsistencyLevel readConsistency, MetricRegistry metricRegistry) {
_readConsistency = Objects.requireNonNull(readConsistency, "readConsistency");
_tokenFactory = new ByteOrderedPartitioner().getTokenFactory();
_blobReadMeter = metricRegistry.meter(getMetricName("blob-read"));
_blobWriteMeter = metricRegistry.meter(getMetricName("blob-write"));
_blobDeleteMeter = metricRegistry.meter(getMetricName("blob-delete"));
_blobCopyMeter = metricRegistry.meter(getMetricName("blob-copy"));
_blobMetadataReadMeter = metricRegistry.meter(getMetricName("blob-metadata-read"));
_blobMetadataWriteMeter = metricRegistry.meter(getMetricName("blob-metadata-write"));
_blobMetadataDeleteMeter = metricRegistry.meter(getMetricName("blob-metadata-delete"));
_blobMetadataCopyMeter = metricRegistry.meter(getMetricName("blob-metadata-copy"));
_scanBatchTimer = metricRegistry.timer(getMetricName("scan-batch"));
_scanReadMeter = metricRegistry.meter(getMetricName("scan-reads"));
}
public MetricsStatsCounter(MetricRegistry registry) {
hitCount = registry.meter("camelcache.hits");
missCount = registry.meter("camelcache.misses");
totalLoadTime = registry.timer("camelcache.loads");
loadSuccessCount = registry.meter("camelcache.loads-success");
loadFailureCount = registry.meter("camelcache.loads-failure");
evictionCount = registry.meter("camelcache.evictions");
evictionWeight = registry.meter("camelcache.evictions-weight");
}
public SiddhiLatencyMetric(String name, final MetricRegistry metricRegistry) {
this.metricName = name;
execLatencyTimer = new ThreadLocal<Timer>() {
protected Timer initialValue() {
return metricRegistry.timer(metricName);
}
};
context = new ThreadLocal<Timer.Context>() {
protected Timer.Context initialValue() {
return null;
}
};
}
@Inject
public CqlBlockedDataReaderDAO(@CqlReaderDAODelegate DataReaderDAO delegate, PlacementCache placementCache,
CqlDriverConfiguration driverConfig, ChangeEncoder changeEncoder,
MetricRegistry metricRegistry, DAOUtils daoUtils, @PrefixLength int deltaPrefixLength) {
_astyanaxReaderDAO = checkNotNull(delegate, "delegate");
_placementCache = placementCache;
_driverConfig = driverConfig;
_changeEncoder = changeEncoder;
_randomReadMeter = metricRegistry.meter(getMetricName("random-reads"));
_readBatchTimer = metricRegistry.timer(getMetricName("readBatch"));
_deltaPrefixLength = deltaPrefixLength;
_daoUtils = daoUtils;
}
public StoreToolsMetrics(MetricRegistry registry) {
dumpIndexTimeMs = registry.timer(MetricRegistry.name(DumpIndexTool.class, "DumpIndexTimeMs"));
dumpReplicaIndexesTimeMs = registry.timer(MetricRegistry.name(DumpIndexTool.class, "DumpReplicaIndexesTimeMs"));
dumpLogTimeMs = registry.timer(MetricRegistry.name(DumpLogTool.class, "DumpLogTimeMs"));
findAllEntriesPerIndexTimeMs =
registry.timer(MetricRegistry.name(DumpIndexTool.class, "FindAllEntriesPerIndexTimeMs"));
readSingleBlobRecordFromLogTimeMs =
registry.timer(MetricRegistry.name(DumpDataTool.class, "ReadSingleBlobRecordFromLogTimeMs"));
readFromLogAndVerifyTimeMs = registry.timer(MetricRegistry.name(DumpDataTool.class, "ReadFromLogAndVerifyTimeMs"));
compareIndexFileToLogTimeMs =
registry.timer(MetricRegistry.name(DumpDataTool.class, "CompareIndexFileToLogTimeMs"));
compareReplicaIndexFilesToLogTimeMs =
registry.timer(MetricRegistry.name(DumpDataTool.class, "CompareReplicaIndexFilesToLogTimeMs"));
logDeserializationError = registry.counter(MetricRegistry.name(DumpLogTool.class, "LogDeserializationErrorCount"));
endOfFileOnDumpLogError = registry.counter(MetricRegistry.name(DumpLogTool.class, "EndOfFileOnDumpLogErrorCount"));
unknownErrorOnDumpIndex =
registry.counter(MetricRegistry.name(DumpIndexTool.class, "UnknownErrorOnDumpIndexCount"));
unknownErrorOnDumpLog = registry.counter(MetricRegistry.name(DumpLogTool.class, "UnknownErrorOnDumpLogCount"));
indexToLogDeleteFlagMisMatchError =
registry.counter(MetricRegistry.name(DumpDataTool.class, "IndexToLogDeleteFlagMisMatchErrorCount"));
indexToLogExpiryMisMatchError =
registry.counter(MetricRegistry.name(DumpDataTool.class, "IndexToLogExpiryMisMatchErrorCount"));
indexToLogBlobIdMisMatchError =
registry.counter(MetricRegistry.name(DumpDataTool.class, "IndexToLogBlobIdMisMatchErrorCount"));
indexToLogBlobRecordComparisonFailure =
registry.counter(MetricRegistry.name(DumpDataTool.class, "IndexToLogBlobRecordComparisonFailureCount"));
logRangeNotFoundInIndexError =
registry.counter(MetricRegistry.name(DumpDataTool.class, "LogRangeNotFoundInIndexErrorCount"));
indexLogEndOffsetMisMatchError =
registry.counter(MetricRegistry.name(DumpDataTool.class, "IndexLogEndOffsetMisMatchErrorCount"));
}
@RequestMapping("/thread")
public String test(@RequestParam("threadCount") int threadCount,
@RequestParam("second") int second,
@RequestParam("data") String data) throws Exception {
if (isRunning.compareAndSet(false, true)) {
try {
MetricRegistry registry = new MetricRegistry();
Timer timer = registry.timer("test.timer");
ExecutorService pool = new ThreadPoolExecutor(threadCount + 10,
threadCount + 10,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
//预热
for (int i = 0; i < 10; i++) {
rpcCall(data);
}
//开启线程一起跑
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int t = 0; t < threadCount; t++) {
pool.submit(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException ignored) {
}
while (isRunning.get()) {
Timer.Context tc = timer.time();
rpcCall(data);
tc.stop();
}
}
});
//保证10个线程一起启动
countDownLatch.countDown();
}
TimeUnit.SECONDS.sleep(second);
pool.shutdownNow();
return getMetricReport(registry);
} finally {
isRunning.set(false);
}
}
return "is running now.";
}
/**
* Package private constructor for unit tests.
*/
LoadMonitor(KafkaCruiseControlConfig config,
MetadataClient metadataClient,
AdminClient adminClient,
Time time,
Executor executor,
MetricRegistry dropwizardMetricRegistry,
MetricDef metricDef) {
_config = config;
_metadataClient = metadataClient;
_adminClient = adminClient;
_time = time;
_brokerCapacityConfigResolver = config.getConfiguredInstance(MonitorConfig.BROKER_CAPACITY_CONFIG_RESOLVER_CLASS_CONFIG,
BrokerCapacityConfigResolver.class);
long monitorStateUpdateIntervalMs = config.getLong(MonitorConfig.MONITOR_STATE_UPDATE_INTERVAL_MS_CONFIG);
_monitorStateUpdateTimeoutMs = 10 * monitorStateUpdateIntervalMs;
_topicConfigProvider = config.getConfiguredInstance(MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG,
TopicConfigProvider.class);
_partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata());
_brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config);
_acquiredClusterModelSemaphore = ThreadLocal.withInitial(() -> false);
// We use the number of proposal precomputing threads config to ensure there is enough concurrency if users
// wants that.
int numPrecomputingThread = config.getInt(AnalyzerConfig.NUM_PROPOSAL_PRECOMPUTE_THREADS_CONFIG);
_clusterModelSemaphore = new Semaphore(Math.max(1, numPrecomputingThread), true);
_defaultModelCompletenessRequirements =
MonitorUtils.combineLoadRequirementOptions(AnalyzerUtils.getGoalsByPriority(config));
_loadMonitorTaskRunner =
new LoadMonitorTaskRunner(config, _partitionMetricSampleAggregator, _brokerMetricSampleAggregator, _metadataClient,
metricDef, time, dropwizardMetricRegistry, _brokerCapacityConfigResolver, executor);
_clusterModelCreationTimer = dropwizardMetricRegistry.timer(MetricRegistry.name("LoadMonitor",
"cluster-model-creation-timer"));
_loadMonitorExecutor = Executors.newScheduledThreadPool(2,
new KafkaCruiseControlThreadFactory("LoadMonitorExecutor", true, LOG));
_loadMonitorExecutor.scheduleAtFixedRate(new SensorUpdater(), 0, monitorStateUpdateIntervalMs, TimeUnit.MILLISECONDS);
_loadMonitorExecutor.scheduleAtFixedRate(new PartitionMetricSampleAggregatorCleaner(), 0,
PartitionMetricSampleAggregatorCleaner.CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
dropwizardMetricRegistry.register(MetricRegistry.name("LoadMonitor", "valid-windows"),
(Gauge<Integer>) this::numValidSnapshotWindows);
dropwizardMetricRegistry.register(MetricRegistry.name("LoadMonitor", "monitored-partitions-percentage"),
(Gauge<Double>) this::monitoredPartitionsPercentage);
dropwizardMetricRegistry.register(MetricRegistry.name("LoadMonitor", "total-monitored-windows"),
(Gauge<Integer>) this::totalMonitoredSnapshotWindows);
dropwizardMetricRegistry.register(MetricRegistry.name("LoadMonitor", "num-partitions-with-extrapolations"),
(Gauge<Integer>) this::numPartitionsWithExtrapolations);
}
@Inject
public SingularityS3UploaderMetrics(
MetricRegistry registry,
@Named(METRICS_OBJECT_MAPPER) ObjectMapper mapper,
SingularityS3Configuration baseConfiguration
) {
super(registry, baseConfiguration, mapper);
this.registry = registry;
this.uploaderCounter = registry.counter(name("uploaders", "total"));
this.immediateUploaderCounter = registry.counter(name("uploaders", "immediate"));
this.uploadCounter = registry.counter(name("uploads", "success"));
this.errorCounter = registry.counter(name("uploads", "errors"));
this.uploadTimer = registry.timer(name("uploads", "timer"));
this.expiring = Optional.empty();
this.timeOfLastSuccessUpload = -1;
registry.register(
name("uploads", "millissincelast"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (timeOfLastSuccessUpload == -1) {
return -1;
}
return Integer.valueOf(
(int) (System.currentTimeMillis() - timeOfLastSuccessUpload)
);
}
}
);
registry.register(
name("uploads", "lastdurationmillis"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
return lastUploadDuration;
}
}
);
registry.register(
name("uploaders", "expiring"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
if (!expiring.isPresent()) {
return 0;
}
return expiring.get().size();
}
}
);
this.filesystemEventsMeter = registry.meter(name("filesystem", "events"));
startJmxReporter();
}
/**
* successfully publish metrics. no stat filtering
*/
@Test
public void testPublishMetrics() throws InterruptedException {
MetricRegistry metricRegistry = new MetricRegistry();
metricRegistry.counter("UnitTestCounter1").inc();
metricRegistry.counter("UnitTestCounter2").inc();
metricRegistry.counter("UnitTestCounter2").inc();
metricRegistry.counter("UnitTestCounter3").inc();
metricRegistry.meter("UnitTestMeter");
metricRegistry.histogram("UnitTestHistogram");
metricRegistry.timer("UnitTestTimer");
metricRegistry.register("UnitTestGauge", new Gauge<Object>() {
@Override
public Object getValue() {
return 1;
}
});
//this gauge should not be reported to AWS because its value is not numeric
metricRegistry.register("InvalidUnitTestGauge", new Gauge<Object>() {
@Override
public Object getValue() {
return "foo";
}
});
final AmazonCloudWatch amazonCloudWatch = Mockito.mock(AmazonCloudWatch.class);
reporter =
new MetricsCloudWatchReporter(
APP_NAME,
APP_VERSION,
APP_ENVIRONMENT,
"utc1=UnitTestCounter1,utc2=UnitTestCounter2,utg=UnitTestGauge,utm=UnitTestMeter,uth=UnitTestHistogram,utt=UnitTestTimer",
2,
TimeUnit.SECONDS,
metricRegistry,
createCloudWatchFactory(amazonCloudWatch));
reporter.start();
//give the reporter a chance to publish
Thread.sleep(3000);
PutMetricDataRequestMatcher matcher = new PutMetricDataRequestMatcher(
new MetricDatumValidator("utg", APP_ENVIRONMENT, 1d),
new MetricDatumValidator("utc1", APP_ENVIRONMENT, 1d),
new MetricDatumValidator("utc2", APP_ENVIRONMENT, 2d),
new MetricDatumValidator("uth.count", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.min", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.max", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.mean", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.stddev", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.75p", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.95p", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.98p", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.99p", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("uth.999p", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utm.1m", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utm.5m", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utm.15m", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utm.mean", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utt.count", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utt.1m", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utt.5m", APP_ENVIRONMENT, 0d)
);
PutMetricDataRequestMatcher matcher2 =
new PutMetricDataRequestMatcher(new MetricDatumValidator("utt.15m", APP_ENVIRONMENT, 0d),
new MetricDatumValidator("utt.mean", APP_ENVIRONMENT, 0d)
);
//first request to AWS with 20 events
Mockito.verify(amazonCloudWatch, Mockito.times(1)).putMetricData(Mockito.argThat(matcher));
//seconds request to AWS with 2 events
Mockito.verify(amazonCloudWatch, Mockito.times(1)).putMetricData(Mockito.argThat(matcher2));
}