下面列出了怎么用com.codahale.metrics.Timer的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void addSetToDictionary( EntityRef entityRef, String dictionaryName, Set<?> elementValues )
throws Exception {
if ( ( elementValues == null ) || elementValues.isEmpty() ) {
return;
}
EntityRef entity = get( entityRef );
UUID timestampUuid = UUIDUtils.newTimeUUID();
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
for ( Object elementValue : elementValues ) {
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
}
//Adding graphite metrics
Timer.Context timeAddingSetDictionary = entAddDictionarySetTimer.time();
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
timeAddingSetDictionary.stop();
}
private Consumer<PlatformMessage> consumer(MethodListener listener) {
return (message) -> {
long startTime = System.nanoTime();
String type = message.getMessageType();
try {
listener.onEvent(message, message.getValue());
Timer success = MESSAGE_SUCCESS.get(type);
if (success != null) {
success.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
} catch(Exception e) {
log.warn("Error processing event", e);
Timer fail = MESSAGE_FAIL.get(type);
if (fail != null) {
fail.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}
};
}
UpdateItemResult updateItem(final UpdateItemRequest request) throws BackendException {
setUserAgent(request);
UpdateItemResult result;
final int bytes;
if (request.getUpdateExpression() != null) {
bytes = calculateExpressionBasedUpdateSize(request);
} else {
bytes = calculateItemUpdateSizeInBytes(request.getAttributeUpdates());
}
getBytesHistogram(UPDATE_ITEM, request.getTableName()).update(bytes);
final int wcu = computeWcu(bytes);
timedWriteThrottle(UPDATE_ITEM, request.getTableName(), wcu);
final Timer.Context apiTimerContext = getTimerContext(UPDATE_ITEM, request.getTableName());
try {
result = client.updateItem(request);
} catch (Exception e) {
throw processDynamoDbApiException(e, UPDATE_ITEM, request.getTableName());
} finally {
apiTimerContext.stop();
}
meterConsumedCapacity(UPDATE_ITEM, result.getConsumedCapacity());
return result;
}
@Test
@InSequence(3)
public void removeTimerFromRegistry() {
// Get a contextual instance of the bean
TimedMethodBean bean = instance.get();
assertThat("Timer is not registered correctly", registry.getTimers(), hasKey(TIMER_NAME));
Timer timer = registry.getTimers().get(TIMER_NAME);
// Remove the timer from metrics registry
registry.remove(TIMER_NAME);
try {
// Call the timed method and assert an exception is thrown
bean.timedMethod();
} catch (RuntimeException cause) {
assertThat(cause, is(instanceOf(IllegalStateException.class)));
assertThat(cause.getMessage(), is(equalTo("No timer with name [" + TIMER_NAME + "] found in registry [" + registry + "]")));
// Make sure that the timer hasn't been called
assertThat("Timer count is incorrect", timer.getCount(), is(equalTo(TIMER_COUNT.get())));
return;
}
fail("No exception has been re-thrown!");
}
@Override
public void dispatch(final byte[] ignored, final byte[] data) throws Exception {
RequestBody body = RequestBody.create(PROTOBUF, data);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
try (Timer.Context timer = dispatchTimer.time(); Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
dispatchFailure.mark();
LOGGER.error("Fail to post the record to the http collector with status code {}", response.code());
}
} catch (Exception e) {
dispatchFailure.mark();
LOGGER.error("Fail to post the record to the http collector", e);
}
}
private void onPartitionExist(IMetaStoreClient client, Table table, HivePartition partition, Partition nativePartition, Partition existedPartition) throws TException {
HivePartition existingPartition;
if(existedPartition == null) {
try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
existingPartition = HiveMetaStoreUtils.getHivePartition(
client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
}
} else {
existingPartition = HiveMetaStoreUtils.getHivePartition(existedPartition);
}
if (needToUpdatePartition(existingPartition, partition)) {
log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
stringifyPartition(existingPartition), stringifyPartition(partition)));
Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
log.info(String.format("Altering partition %s", newPartition));
try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
}
log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
table.getTableName(), nativePartition.getSd().getLocation()));
} else {
log.debug(String.format("Partition %s in table %s with location %s already exists and no need to update",
stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
}
}
@Override
public Status ackMessage(String queueName, UUID queueMessageId ) {
if( logger.isTraceEnabled() ){
logger.trace("Acking message for queue {} with id: {}", queueName, queueMessageId);
}
Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
try {
QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
return sendMessageToLocalRouters( message );
} finally {
timer.close();
}
}
@Override
public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("PUT_IF_ABSENT");
final Lock readLock = this.readWriteLock.readLock();
readLock.lock();
try {
final byte[] prevVal = this.db.get(key);
if (prevVal == null) {
this.db.put(this.writeOptions, key, value);
}
setSuccess(closure, prevVal);
} catch (final Exception e) {
LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value),
StackTraceUtil.stackTrace(e));
setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e);
} finally {
readLock.unlock();
timeCtx.stop();
}
}
@Override
public void put( Locator locator, String key, String value ) throws IOException {
Timer.Context ctx = Instrumentation.getWriteTimerContext( CassandraModel.CF_METRICS_METADATA_NAME );
Session session = DatastaxIO.getSession();
try {
BoundStatement bound = putValue.bind( locator.toString(), key, serDes.serialize( value ) );
ResultSet result = session.execute( bound );
LOG.trace( "result.size=" + result.all().size() );
}
finally {
ctx.stop();
}
}
/**
* The {@link Snapshot} values of {@link Timer} are reported as {@link StatisticSet} after conversion. The
* conversion is done using the duration factor, which is deduced from the set duration unit.
* <p>
* Please note, the reported values submitted only if they show some data (greater than zero) in order to:
* <p>
* 1. save some money
* 2. prevent com.amazonaws.services.cloudwatch.model.InvalidParameterValueException if empty {@link Snapshot}
* is submitted
* <p>
* If {@link Builder#withZeroValuesSubmission()} is {@code true}, then all values will be submitted
*
* @see Timer#getSnapshot
* @see #getDurationUnit
* @see #convertDuration(double)
*/
private void processTimer(final String metricName, final Timer timer, final List<MetricDatum> metricData) {
final Snapshot snapshot = timer.getSnapshot();
if (builder.withZeroValuesSubmission || snapshot.size() > 0) {
for (final Percentile percentile : builder.percentiles) {
final double convertedDuration = convertDuration(snapshot.getValue(percentile.getQuantile()));
stageMetricDatum(true, metricName, convertedDuration, durationUnit, percentile.getDesc(), metricData);
}
}
// prevent empty snapshot from causing InvalidParameterValueException
if (snapshot.size() > 0) {
final String formattedDuration = String.format(" [in-%s]", getDurationUnit());
stageMetricDatum(builder.withArithmeticMean, metricName, convertDuration(snapshot.getMean()), durationUnit, DIMENSION_SNAPSHOT_MEAN + formattedDuration, metricData);
stageMetricDatum(builder.withStdDev, metricName, convertDuration(snapshot.getStdDev()), durationUnit, DIMENSION_SNAPSHOT_STD_DEV + formattedDuration, metricData);
stageMetricDatumWithConvertedSnapshot(builder.withStatisticSet, metricName, snapshot, durationUnit, metricData);
}
}
@Override
public Void call() throws Exception {
int runs = 1;
while (runs <= 100) {
metricRegistry.counter("TheCounter").inc();
metricRegistry.meter("TheMeter").mark(runs);
metricRegistry.histogram("TheHistogram").update(runs);
Timer.Context context = theTimer.time();
Thread.sleep(250);
context.stop();
++runs;
Thread.sleep(1000);
}
return null;
}
public void heartbeat() {
try(Timer.Context timer = heartbeatTimer.time()) {
Map<PlatformPartition, Set<String>> connectedHubs = new LinkedHashMap<>(2 * partitionsPerHeartbeat);
int offset = nextHeartbeatPartition.getAndAdd(partitionsPerHeartbeat);
for(int i=0; i<partitionsPerHeartbeat; i++) {
connectedHubs.put(
partitioner.getPartitionById(Math.floorMod(offset + i, partitioner.getPartitionCount())),
new HashSet<>()
);
}
for(Session session: getSessions()) {
HubSession hs = (HubSession) session;
accumulate(hs.getPartition(), hs.getHubId(), connectedHubs);
}
flush(connectedHubs);
}
}
@Test
public void noAnnotation() {
Timer timer = testMetricRegistry.timer("timed");
Meter meter = testMetricRegistry.meter("metered");
when(mockMetricRegistry.timer(anyString())).thenReturn(timer);
when(mockMetricRegistry.meter(anyString())).thenReturn(meter);
long oldtimervalue = timer.getCount();
long oldmetervalue = meter.getCount();
Invoker invoker = invokerBuilder.create(instrumentedService, new FooInvoker());
this.setTargetMethod(exchange, "foo"); // simulate CXF behavior
Object result = invoker.invoke(exchange, null);
assertEquals("fooReturn", result);
assertThat(timer.getCount(), is(oldtimervalue));
assertThat(meter.getCount(), is(oldmetervalue));
}
@Override
public void compareAndPut(final byte[] key, final byte[] expect, final byte[] update, final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("COMPARE_PUT");
try {
final byte[] actual = this.defaultDB.get(key);
if (Arrays.equals(expect, actual)) {
this.defaultDB.put(key, update);
setSuccess(closure, Boolean.TRUE);
} else {
setSuccess(closure, Boolean.FALSE);
}
} catch (final Exception e) {
LOG.error("Fail to [COMPARE_PUT], [{}, {}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(expect),
BytesUtil.toHex(update), StackTraceUtil.stackTrace(e));
setCriticalError(closure, "Fail to [COMPARE_PUT]", e);
} finally {
timeCtx.stop();
}
}
@DataProvider(value = {
"null",
"0",
"1",
"2"
}, splitBy = "\\|")
@Test
public void getNamedTimer_with_varargs_dimensions_creates_dimensioned_timer_using_sfx_mechanisms(
Integer numDimensions
) {
// given
String timerName = UUID.randomUUID().toString();
Pair<String, String>[] varargDims = generateVarargDimensions(numDimensions);
List<Pair<String, String>> dimsAsList = (varargDims == null) ? null : Arrays.asList(varargDims);
// when
Timer result = sfxImpl.getNamedTimer(timerName, varargDims);
// then
verifyMetricCreation(timerBuilderMock, timerTaggerMock, timerName, dimsAsList, timerMock, result);
}
private Timer requestTimer(HttpMethod m) {
if (m == null) {
return otherRequests;
}
else {
if (m.equals(HttpMethod.GET))
return getRequests;
else if (m.equals(HttpMethod.POST))
return postRequests;
else if (m.equals(HttpMethod.PUT))
return putRequests;
else if (m.equals(HttpMethod.DELETE))
return deleteRequests;
else
return otherRequests;
}
}
private void updateFrequentItemOrder() {
Timer.Context context = updateFrequentItemOrder.time();
sortedNodes.clear();
frequentItemOrder.clear();
// we have to materialize a canonical order so that items with equal counts
// are consistently ordered when they are sorted during transaction insertion
List<Map.Entry<Integer, Double>> sortedItemCounts = Lists.newArrayList(frequentItemCounts.entrySet());
sortedItemCounts.sort((i1, i2) -> frequentItemCounts.get(i1.getKey())
.compareTo(frequentItemCounts.get(i2.getKey())));
for (int i = 0; i < sortedItemCounts.size(); ++i) {
frequentItemOrder.put(sortedItemCounts.get(i).getKey(), i);
}
context.stop();
}
static <T> T runWithMetrics(String prefix, String storeName, String name, UncheckedCallable<T> impl) {
if (null == prefix) {
return impl.call();
}
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(impl);
final MetricManager mgr = MetricManager.INSTANCE;
mgr.getCounter(prefix, storeName, name, M_CALLS).inc();
final Timer.Context tc = mgr.getTimer(prefix, storeName, name, M_TIME).time();
try {
return impl.call();
} catch (RuntimeException e) {
mgr.getCounter(prefix, storeName, name, M_EXCEPTIONS).inc();
throw e;
} finally {
tc.stop();
}
}
@Test
public void testDecorateDisabledGauge() {
reporter = OpenTsdbReporter.forRegistry(registry)
.withClock(clock)
.prefixedWith("prefix")
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.withTags(Collections.singletonMap("foo", "bar"))
.withBatchSize(100)
.withCounterGaugeDecorations(false)
.build(opentsdb);
when(gauge.getValue()).thenReturn(1L);
reporter.report(this.map("gauge", gauge), this.<Counter>map(), this.<Histogram>map(), this.<Meter>map(), this.<Timer>map());
verify(opentsdb).send(captor.capture());
final Set<OpenTsdbMetric> metrics = captor.getValue();
assertEquals(1, metrics.size());
OpenTsdbMetric metric = metrics.iterator().next();
assertEquals("prefix.gauge", metric.getMetric());
assertEquals(1L, metric.getValue());
assertEquals((Long) timestamp, metric.getTimestamp());
}
/**
* To lookup a list of ids asynchronously. Given the id list return the futures to get the values. One can also register callback
* on the server side upon lookup, see {@link com.yahoo.ads.pb.customization.LookupCallback}
*
* @param ids id to look up as list of byte[].
* @param callback whether need callback on the same side or not
* @return <code>Map<byte[], Future<byte[]>></code> return in a map of futre of value for each corresponding ids
* @exception MasterNotFoundException when fail because no master found
* @exception ConnectionBrokenException when fail because connection is broken in the middle
* @exception Exception other errors indicating failure
*/
public Future<Map<byte[], byte[]>> multiLookUpAsync(List<byte[]> ids, boolean callback) throws MasterNotFoundException, ConnectionBrokenException, Exception {
final Timer.Context context = multiLookupAsyncTimer.time();
RetryWaiter retryWaiter = new RetryWaiter(multiLookupAsyncFailureRequests);
try {
while (true) {
try {
return clientImpl.multiLookupAsync(ids, callback);
}catch (Exception e) {
retryWaiter.waitBeforeRetry(e);
}
}
} finally {
context.stop();
}
}
@Override
protected void registerPath(HiveSpec spec) throws IOException {
try (Timer.Context context = this.metricContext.timer(PATH_REGISTER_TIMER).time();
AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) {
Table table = HiveMetaStoreUtils.getTable(spec.getTable());
createDbIfNotExists(client.get(), table.getDbName());
createOrAlterTable(client.get(), table, spec);
Optional<HivePartition> partition = spec.getPartition();
if (partition.isPresent()) {
addOrAlterPartition(client.get(), table, partition.get());
}
HiveMetaStoreEventHelper.submitSuccessfulPathRegistration(eventSubmitter, spec);
} catch (TException e) {
HiveMetaStoreEventHelper.submitFailedPathRegistration(eventSubmitter, spec, e);
throw new IOException(e);
}
}
@Override
public void run() {
Timer.Context ctx = batchWriteTimer.time();
try {
metricsRW.insertRollups(writeContexts);
} catch (Exception e) {
LOG.warn("not able to insert rollups", e);
executionContext.markUnsuccessful(e);
} finally {
executionContext.decrementWriteCounter(writeContexts.size());
rollupsPerBatch.update(writeContexts.size());
rollupsWriteRate.mark(writeContexts.size());
RollupService.lastRollupTime.set(System.currentTimeMillis());
ctx.stop();
}
}
@Test
public void testCreateAndInitTimer() {
MetricRegistryJson metricRegistryJson = new MetricRegistryJson();
Map<String, TimerJson> timer = new HashMap<>();
TimerJson timerJson = new TimerJson();
timerJson.setCount(100);
timerJson.setMean(10);
timer.put("t" + MetricsConfigurator.TIMER_SUFFIX, timerJson);
metricRegistryJson.setTimers(timer);
Timer t = MetricsHelper.createAndInitTimer(metricRegistryJson, metricRegistry, "y", "x", "1");
Assert.assertEquals(0, t.getCount());
t = MetricsHelper.createAndInitTimer(metricRegistryJson, metricRegistry, "t", "x", "1");
Assert.assertEquals(1, t.getCount());
// convert milli seconds to nano seconds
Assert.assertEquals(100 * 1000 *1000, t.getSnapshot().getValues()[0]);
}
@Override
public <T> T getProperty(String key, Class<T> type) {
Timer.Context context = getPropertyTimer.time();
try {
return delegate.getProperty(key, type);
} finally {
context.stop();
}
}
public static JsonObject convertMetric(Metric metric, TimeUnit rateUnit, TimeUnit durationUnit) {
if (metric instanceof Timer) {
return toJson((Timer) metric, rateUnit, durationUnit);
} else if (metric instanceof Gauge) {
return toJson((Gauge) metric);
} else if (metric instanceof Counter) {
return toJson((Counter) metric);
} else if (metric instanceof Histogram) {
return toJson((Histogram) metric);
} else if (metric instanceof Meter) {
return toJson((Meter) metric, rateUnit);
} else {
throw new IllegalArgumentException("Unknown metric " + metric);
}
}
private void addAlias(String indexName) {
Timer.Context timer = updateAliasTimer.time();
try {
Boolean isAck;
final AdminClient adminClient = esProvider.getClient().admin();
String[] indexNames = getIndexes(AliasType.Write);
int count = 0;
IndicesAliasesRequestBuilder aliasesRequestBuilder = adminClient.indices().prepareAliases();
for (String currentIndex : indexNames) {
aliasesRequestBuilder.removeAlias(currentIndex, alias.getWriteAlias());
count++;
}
if (count > 0) {
isAck = aliasesRequestBuilder.execute().actionGet().isAcknowledged();
logger.info("Removed Index Name from Alias=[{}] ACK=[{}]", alias, isAck);
}
aliasesRequestBuilder = adminClient.indices().prepareAliases();
//Added For Graphite Metrics
//add write alias
aliasesRequestBuilder.addAlias(indexName, alias.getWriteAlias());
//Added For Graphite Metrics
// add read alias
aliasesRequestBuilder.addAlias(indexName, alias.getReadAlias());
isAck = aliasesRequestBuilder.execute().actionGet().isAcknowledged();
logger.info("Created new read and write aliases ACK=[{}]", isAck);
aliasCache.invalidate(alias);
} catch (Exception e) {
logger.warn("Failed to create alias ", e);
} finally {
timer.stop();
}
}
public Response poll(Subject subject, SubjectDatabus databus, String subscription, Duration claimTtl, int limit, HttpServletRequest request,
boolean ignoreLongPoll, PeekOrPollResponseHelper helper) {
Timer.Context timerContext = _pollTimer.time();
boolean synchronousResponse = true;
Response response;
try {
// Calculate when the request should internally time out (do this before our first poll() request because we
// want that to count toward our total run-time)
long longPollStopTime = System.currentTimeMillis() + MAX_LONG_POLL_TIME.toMillis();
// Always issue a synchronous request at the start . . this will allow us to bypass our thread pool logic
// altogether in cases where we might return the value immediately (see more below). There is a danger here
// that the thread will stall if "databus" is an instance of DatabusClient and we are stuck waiting for a
// response - however, since we use the server-side client we know that it will always execute synchronously
// itself (no long-polling) and return in a reasonable period of time.
PollResult result = databus.poll(subject, subscription, claimTtl, limit);
if (ignoreLongPoll || result.getEventIterator().hasNext() || _keepAliveExecutorService == null || _pollingExecutorService == null) {
// If ignoreLongPoll == true or we have no executor services to schedule long-polling on then always
// return a response, even if it's empty. Alternatively, if we have data to return - return it!
response = Response.ok()
.header(POLL_DATABUS_EMPTY_HEADER, String.valueOf(!result.hasMoreEvents()))
.entity(helper.asEntity(result.getEventIterator()))
.build();
} else {
// If the response is empty then go into async-mode and start up the runnables for our long-polling.
response = scheduleLongPollingRunnables(request, longPollStopTime, subject, databus, claimTtl, limit, subscription,
result.hasMoreEvents(), helper, timerContext);
synchronousResponse = false;
}
} finally {
// Stop our timer if our request is finished here . . otherwise we are in async mode and it is the
// responsibility of DatabusPollRunnable to close it out.
if (synchronousResponse) {
timerContext.stop();
}
}
return response;
}
@Test
public void timedMethodsNotCalledYet() {
assertThat("Shared metric registry is not created", SharedMetricRegistries.names(), hasItem(REGISTRY_NAME));
MetricRegistry registry = SharedMetricRegistries.getOrCreate(REGISTRY_NAME);
assertThat("Timers are not registered correctly", registry.getTimers().keySet(), is(equalTo(absoluteMetricNames())));
// Make sure that all the timers haven't been called yet
assertThat("Timer counts are incorrect", registry.getTimers().values(), everyItem(Matchers.<Timer>hasProperty("count", equalTo(0L))));
}
public <T extends ModbusResponse> CompletableFuture<T> sendRequest(ModbusRequest request, int unitId) {
CompletableFuture<T> future = new CompletableFuture<>();
channelFsm.getChannel().whenComplete((ch, ex) -> {
if (ch != null) {
short txId = (short) transactionId.incrementAndGet();
Timeout timeout = config.getWheelTimer().newTimeout(t -> {
if (t.isCancelled()) return;
PendingRequest<? extends ModbusResponse> timedOut = pendingRequests.remove(txId);
if (timedOut != null) {
timedOut.promise.completeExceptionally(new ModbusTimeoutException(config.getTimeout()));
timeoutCounter.inc();
}
}, config.getTimeout().getSeconds(), TimeUnit.SECONDS);
Timer.Context context = responseTimer.time();
pendingRequests.put(txId, new PendingRequest<>(future, timeout, context));
ch.writeAndFlush(new ModbusTcpPayload(txId, (short) unitId, request)).addListener(f -> {
if (!f.isSuccess()) {
PendingRequest<?> p = pendingRequests.remove(txId);
if (p != null) {
p.promise.completeExceptionally(f.cause());
p.timeout.cancel();
}
}
});
requestCounter.inc();
} else {
future.completeExceptionally(ex);
}
});
return future;
}
private boolean renewLead() {
try (Timer.Context cx
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "renewLead")).time()) {
boolean result = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).renewLead(leaderElectionId)
: true;
if (!result) {
context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc();
}
return result;
}
}