下面列出了怎么用org.apache.hadoop.hbase.client.metrics.ScanMetrics的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Run the scan to completetion and check the metric against the specified value
* @param scan The scan instance to use to record metrics
* @param metricKey The metric key name
* @param expectedValue The expected value of metric
* @throws Exception on unexpected failure
*/
private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
ResultScanner scanner = TABLE.getScanner(scan);
// Iterate through all the results
while (scanner.next() != null) {
continue;
}
scanner.close();
ScanMetrics metrics = scanner.getScanMetrics();
assertTrue("Metrics are null", metrics != null);
assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
final long actualMetricValue = metrics.getCounter(metricKey).get();
assertEquals(
"Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue,
expectedValue, actualMetricValue);
}
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
return;
}
try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
}
public static ScanMetrics toScanMetrics(final byte[] bytes) {
MapReduceProtos.ScanMetrics pScanMetrics = null;
try {
pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
// Ignored there are just no key values to add.
}
ScanMetrics scanMetrics = new ScanMetrics();
if (pScanMetrics != null) {
for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
if (pair.hasName() && pair.hasValue()) {
scanMetrics.setCounter(pair.getName(), pair.getValue());
}
}
}
return scanMetrics;
}
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
boolean isRegionServerRemote) {
if (scanMetrics == null || rrs == null || rrs.length == 0) {
return;
}
long resultSize = 0;
for (Result rr : rrs) {
for (Cell cell : rr.rawCells()) {
resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
}
scanMetrics.countOfBytesInResults.addAndGet(resultSize);
if (isRegionServerRemote) {
scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
}
}
private void closeScanner() {
if (logger.isDebugEnabled() && scan != null) {
logger.debug("Scan " + scan.toString());
byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
if (metricsBytes != null) {
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
}
}
try {
if (scanner != null) {
scanner.close();
scanner = null;
}
} catch (Throwable t) {
throw new StorageException("Error when close scanner for table " + tableName, t);
}
}
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
throws IOException {
// region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
// open region from the snapshot directory
region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
conf, hri, htd, null);
region.setRestoredRegion(true);
// we won't initialize the MobFileCache when not running in RS process. so provided an
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
// cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
region.setMobFileCache(new MobFileCache(conf));
region.initialize();
// create an internal region scanner
this.scanner = region.getScanner(scan);
values = new ArrayList<>();
if (scanMetrics == null) {
initScanMetrics(scan);
} else {
this.scanMetrics = scanMetrics;
}
region.startRegionOperation();
}
private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
throws IOException, InterruptedException {
BufferingScanResultConsumer consumer = new BufferingScanResultConsumer();
CONN.getTable(TABLE_NAME).scan(scan, consumer);
List<Result> results = new ArrayList<>();
for (Result result; (result = consumer.take()) != null;) {
results.add(result);
}
return Pair.newPair(results, consumer.getScanMetrics());
}
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
throws IOException {
try (ResultScanner scanner =
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
List<Result> results = new ArrayList<>();
for (Result result; (result = scanner.next()) != null;) {
results.add(result);
}
return Pair.newPair(results, scanner.getScanMetrics());
}
}
@Test
public void testScanMetrics() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true));
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
.mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
ScanMetrics scanMetrics = pair.getSecond();
assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
// also assert a server side metric to ensure that we have published them into the client side
// metrics.
assertEquals(3, scanMetrics.countOfRowsScanned.get());
}
/**
* If hbase runs on new version of mapreduce, RecordReader has access to
* counters thus can update counters based on scanMetrics.
* If hbase runs on old version of mapreduce, it won't be able to get
* access to counters and TableRecorderReader can't update counter values.
*/
private void updateCounters() throws IOException {
ScanMetrics scanMetrics = scanner.getScanMetrics();
if (scanMetrics == null) {
return;
}
updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean result = delegate.nextKeyValue();
if (result) {
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
if (scanMetrics != null && context != null) {
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
}
}
return result;
}
void updateScanMetrics(final ScanMetrics metrics) {
if (metrics == null) return;
Map<String,Long> metricsMap = metrics.getMetricsMap();
Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
if (rpcCalls != null) {
this.rpcCallsHistogram.update(rpcCalls.longValue());
}
Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
if (remoteRpcCalls != null) {
this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
}
Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
if (millisBetweenNext != null) {
this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
}
Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
if (regionsScanned != null) {
this.regionsScannedHistogram.update(regionsScanned.longValue());
}
Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
if (bytesInResults != null && bytesInResults.longValue() > 0) {
this.bytesInResultsHistogram.update(bytesInResults.longValue());
}
Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
}
}
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) {
MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
Map<String, Long> metrics = scanMetrics.getMetricsMap(reset);
for (Entry<String, Long> e : metrics.entrySet()) {
HBaseProtos.NameInt64Pair nameInt64Pair =
HBaseProtos.NameInt64Pair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build();
builder.addMetrics(nameInt64Pair);
}
return builder.build();
}
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
if (scan.getStopRow() == null) {
scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
}
this.scan = scan;
this.consumer = consumer;
this.tableName = tableName;
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
} else {
this.scanMetrics = null;
}
}
public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
this.resultCache = resultCache;
this.consumer = consumer;
this.stub = stub;
this.loc = loc;
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
if (scan.isReversed()) {
completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
} else {
completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
}
this.future = new CompletableFuture<>();
this.priority = priority;
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
}
static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
if (scanMetrics == null) {
return;
}
scanMetrics.countOfRPCcalls.incrementAndGet();
if (isRegionServerRemote) {
scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
}
}
static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
if (scanMetrics == null) {
return;
}
scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
}
/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
*/
static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) {
if (scanMetrics == null || response == null || !response.hasScanMetrics()) {
return;
}
ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter);
}
/**
* Check and initialize if application wants to collect scan metrics
*/
protected void initScanMetrics(Scan scan) {
// check if application wants to collect scan metrics
if (scan.isScanMetricsEnabled()) {
scanMetrics = new ScanMetrics();
}
}
private ResultScanner generateResultScanner(KeyValue... kvs) {
TreeSet<KeyValue> set = new TreeSet<>(KeyValue.COMPARATOR);
set.addAll(Arrays.asList(kvs));
KeyValue[] sortedKvs = new KeyValue[set.size()];
set.toArray(sortedKvs);
final Result result = Result.create(kvs);
return new ResultScanner() {
@Override
public Result next() throws IOException {
return result;
}
@Override
public Result[] next(int nbRows) throws IOException {
return new Result[] {result};
}
@Override
public void close() {
}
public boolean renewLease() {
return false;
}
public ScanMetrics getScanMetrics() {
return null;
}
@Override
public Iterator<Result> iterator() {
return Arrays.asList(result).iterator();
}
};
}
public ScanMetrics getScanMetrics() {
return null;
}
@Override
public ScanMetrics getScanMetrics() {
throw new RuntimeException("getScanMetrics() not supported");
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
throws Exception {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
}
@Test
public void testNoScanMetrics() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
assertEquals(3, pair.getFirst().size());
assertNull(pair.getSecond());
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
public ScanMetrics getScanMetrics() {
return scanMetrics;
}