类org.apache.hadoop.hbase.client.metrics.ScanMetrics源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.client.metrics.ScanMetrics的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Run the scan to completetion and check the metric against the specified value
 * @param scan The scan instance to use to record metrics
 * @param metricKey The metric key name
 * @param expectedValue The expected value of metric
 * @throws Exception on unexpected failure
 */
private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
  assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
  ResultScanner scanner = TABLE.getScanner(scan);
  // Iterate through all the results
  while (scanner.next() != null) {
    continue;
  }
  scanner.close();
  ScanMetrics metrics = scanner.getScanMetrics();
  assertTrue("Metrics are null", metrics != null);
  assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
  final long actualMetricValue = metrics.getCounter(metricKey).get();
  assertEquals(
    "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue,
    expectedValue, actualMetricValue);
}
 
源代码2 项目: hbase   文件: TableRecordReaderImpl.java
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context, long numStale) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCAN_RESULTS_STALE")).increment(numStale);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
 
源代码3 项目: hbase   文件: ProtobufUtil.java
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    // Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
 
源代码4 项目: hbase   文件: ConnectionUtils.java
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
    boolean isRegionServerRemote) {
  if (scanMetrics == null || rrs == null || rrs.length == 0) {
    return;
  }
  long resultSize = 0;
  for (Result rr : rrs) {
    for (Cell cell : rr.rawCells()) {
      resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
    }
  }
  scanMetrics.countOfBytesInResults.addAndGet(resultSize);
  if (isRegionServerRemote) {
    scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
  }
}
 
源代码5 项目: Kylin   文件: CubeSegmentTupleIterator.java
private void closeScanner() {
    if (logger.isDebugEnabled() && scan != null) {
        logger.debug("Scan " + scan.toString());
        byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
        if (metricsBytes != null) {
            ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
            logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
        }
    }
    try {
        if (scanner != null) {
            scanner.close();
            scanner = null;
        }
    } catch (Throwable t) {
        throw new StorageException("Error when close scanner for table " + tableName, t);
    }
}
 
源代码6 项目: hbase   文件: ClientSideRegionScanner.java
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
    throws IOException {
  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();

  // open region from the snapshot directory
  region = HRegion.newHRegion(CommonFSUtils.getTableDir(rootDir, htd.getTableName()), null, fs,
    conf, hri, htd, null);
  region.setRestoredRegion(true);
  // we won't initialize the MobFileCache when not running in RS process. so provided an
  // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
  // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
  // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
  region.setMobFileCache(new MobFileCache(conf));
  region.initialize();

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
 
源代码7 项目: hbase   文件: TestAsyncTableScanMetrics.java
private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
    throws IOException, InterruptedException {
  BufferingScanResultConsumer consumer = new BufferingScanResultConsumer();
  CONN.getTable(TABLE_NAME).scan(scan, consumer);
  List<Result> results = new ArrayList<>();
  for (Result result; (result = consumer.take()) != null;) {
    results.add(result);
  }
  return Pair.newPair(results, consumer.getScanMetrics());
}
 
源代码8 项目: hbase   文件: TestAsyncTableScanMetrics.java
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
    throws IOException {
  try (ResultScanner scanner =
      CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
    List<Result> results = new ArrayList<>();
    for (Result result; (result = scanner.next()) != null;) {
      results.add(result);
    }
    return Pair.newPair(results, scanner.getScanMetrics());
  }
}
 
源代码9 项目: hbase   文件: TestAsyncTableScanMetrics.java
@Test
public void testScanMetrics() throws Exception {
  Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true));
  List<Result> results = pair.getFirst();
  assertEquals(3, results.size());
  long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
  ScanMetrics scanMetrics = pair.getSecond();
  assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
  assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
  assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
  // also assert a server side metric to ensure that we have published them into the client side
  // metrics.
  assertEquals(3, scanMetrics.countOfRowsScanned.get());
}
 
源代码10 项目: hbase   文件: TableRecordReaderImpl.java
/**
 * If hbase runs on new version of mapreduce, RecordReader has access to
 * counters thus can update counters based on scanMetrics.
 * If hbase runs on old version of mapreduce, it won't be able to get
 * access to counters and TableRecorderReader can't update counter values.
 */
private void updateCounters() throws IOException {
  ScanMetrics scanMetrics = scanner.getScanMetrics();
  if (scanMetrics == null) {
    return;
  }

  updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
}
 
源代码11 项目: hbase   文件: TableSnapshotInputFormat.java
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  boolean result = delegate.nextKeyValue();
  if (result) {
    ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
    if (scanMetrics != null && context != null) {
      TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
    }
  }
  return result;
}
 
源代码12 项目: hbase   文件: PerformanceEvaluation.java
void updateScanMetrics(final ScanMetrics metrics) {
  if (metrics == null) return;
  Map<String,Long> metricsMap = metrics.getMetricsMap();
  Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
  if (rpcCalls != null) {
    this.rpcCallsHistogram.update(rpcCalls.longValue());
  }
  Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
  if (remoteRpcCalls != null) {
    this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
  }
  Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
  if (millisBetweenNext != null) {
    this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
  }
  Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
  if (regionsScanned != null) {
    this.regionsScannedHistogram.update(regionsScanned.longValue());
  }
  Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
  if (bytesInResults != null && bytesInResults.longValue() > 0) {
    this.bytesInResultsHistogram.update(bytesInResults.longValue());
  }
  Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
  if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
    this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
  }
}
 
源代码13 项目: hbase   文件: ProtobufUtil.java
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) {
  MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
  Map<String, Long> metrics = scanMetrics.getMetricsMap(reset);
  for (Entry<String, Long> e : metrics.entrySet()) {
    HBaseProtos.NameInt64Pair nameInt64Pair =
        HBaseProtos.NameInt64Pair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build();
    builder.addMetrics(nameInt64Pair);
  }
  return builder.build();
}
 
源代码14 项目: hbase   文件: AsyncClientScanner.java
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
    AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
  if (scan.getStartRow() == null) {
    scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
  }
  if (scan.getStopRow() == null) {
    scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
  }
  this.scan = scan;
  this.consumer = consumer;
  this.tableName = tableName;
  this.conn = conn;
  this.retryTimer = retryTimer;
  this.pauseNs = pauseNs;
  this.pauseForCQTBENs = pauseForCQTBENs;
  this.maxAttempts = maxAttempts;
  this.scanTimeoutNs = scanTimeoutNs;
  this.rpcTimeoutNs = rpcTimeoutNs;
  this.startLogErrorsCnt = startLogErrorsCnt;
  this.resultCache = createScanResultCache(scan);
  if (scan.isScanMetricsEnabled()) {
    this.scanMetrics = new ScanMetrics();
    consumer.onScanMetricsCreated(scanMetrics);
  } else {
    this.scanMetrics = null;
  }
}
 
public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
    long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
    int startLogErrorsCnt) {
  this.retryTimer = retryTimer;
  this.scan = scan;
  this.scanMetrics = scanMetrics;
  this.scannerId = scannerId;
  this.resultCache = resultCache;
  this.consumer = consumer;
  this.stub = stub;
  this.loc = loc;
  this.regionServerRemote = isRegionServerRemote;
  this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
  this.pauseNs = pauseNs;
  this.pauseForCQTBENs = pauseForCQTBENs;
  this.maxAttempts = maxAttempts;
  this.scanTimeoutNs = scanTimeoutNs;
  this.rpcTimeoutNs = rpcTimeoutNs;
  this.startLogErrorsCnt = startLogErrorsCnt;
  if (scan.isReversed()) {
    completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
  } else {
    completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
  }
  this.future = new CompletableFuture<>();
  this.priority = priority;
  this.controller = conn.rpcControllerFactory.newController();
  this.controller.setPriority(priority);
  this.exceptions = new ArrayList<>();
}
 
源代码16 项目: hbase   文件: ConnectionUtils.java
static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
  if (scanMetrics == null) {
    return;
  }
  scanMetrics.countOfRPCcalls.incrementAndGet();
  if (isRegionServerRemote) {
    scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
  }
}
 
源代码17 项目: hbase   文件: ConnectionUtils.java
static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) {
  if (scanMetrics == null) {
    return;
  }
  scanMetrics.countOfRPCRetries.incrementAndGet();
  if (isRegionServerRemote) {
    scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
  }
}
 
源代码18 项目: hbase   文件: ConnectionUtils.java
/**
 * Use the scan metrics returned by the server to add to the identically named counters in the
 * client side metrics. If a counter does not exist with the same name as the server side metric,
 * the attempt to increase the counter will fail.
 */
static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) {
  if (scanMetrics == null || response == null || !response.hasScanMetrics()) {
    return;
  }
  ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter);
}
 
源代码19 项目: hbase   文件: AbstractClientScanner.java
/**
 * Check and initialize if application wants to collect scan metrics
 */
protected void initScanMetrics(Scan scan) {
  // check if application wants to collect scan metrics
  if (scan.isScanMetricsEnabled()) {
    scanMetrics = new ScanMetrics();
  }
}
 
源代码20 项目: spliceengine   文件: MemstoreKeyValueScannerTest.java
private ResultScanner generateResultScanner(KeyValue... kvs) {
    TreeSet<KeyValue> set = new TreeSet<>(KeyValue.COMPARATOR);

    set.addAll(Arrays.asList(kvs));

    KeyValue[] sortedKvs = new KeyValue[set.size()];
    set.toArray(sortedKvs);

    final Result result = Result.create(kvs);

    return new ResultScanner() {
        @Override
        public Result next() throws IOException {
            return result;
        }

        @Override
        public Result[] next(int nbRows) throws IOException {
            return new Result[] {result};
        }

        @Override
        public void close() {

        }

        public boolean renewLease() {
            return false;
        }

        public ScanMetrics getScanMetrics() {
            return null;
        }

        @Override
        public Iterator<Result> iterator() {
            return Arrays.asList(result).iterator();
        }
    };
}
 
源代码21 项目: phoenix-omid   文件: SnapshotFilterImpl.java
public ScanMetrics getScanMetrics() {
    return null;
}
 
源代码22 项目: hbase   文件: ThriftTable.java
@Override
public ScanMetrics getScanMetrics() {
  throw new RuntimeException("getScanMetrics() not supported");
}
 
源代码23 项目: hbase   文件: BufferingScanResultConsumer.java
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
  this.scanMetrics = scanMetrics;
}
 
源代码24 项目: hbase   文件: BufferingScanResultConsumer.java
public ScanMetrics getScanMetrics() {
  return scanMetrics;
}
 
源代码25 项目: hbase   文件: ScanPerNextResultScanner.java
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
  this.scanMetrics = scanMetrics;
}
 
源代码26 项目: hbase   文件: ScanPerNextResultScanner.java
@Override
public ScanMetrics getScanMetrics() {
  return scanMetrics;
}
 
源代码27 项目: hbase   文件: TestAsyncTableScanMetrics.java
private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
    throws Exception {
  SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
  CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
  return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
}
 
源代码28 项目: hbase   文件: TestAsyncTableScanMetrics.java
@Test
public void testNoScanMetrics() throws Exception {
  Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
  assertEquals(3, pair.getFirst().size());
  assertNull(pair.getSecond());
}
 
源代码29 项目: hbase   文件: SimpleScanResultConsumer.java
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
  this.scanMetrics = scanMetrics;
}
 
源代码30 项目: hbase   文件: SimpleScanResultConsumer.java
public ScanMetrics getScanMetrics() {
  return scanMetrics;
}
 
 类方法
 同包方法