类org.apache.hadoop.hbase.regionserver.RegionScanner源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.regionserver.RegionScanner的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Kylin   文件: AggregateRegionObserver.java
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {

    boolean copAbortOnError = ctxt.getEnvironment().getConfiguration().getBoolean(RegionCoprocessorHost.ABORT_ON_ERROR_KEY, RegionCoprocessorHost.DEFAULT_ABORT_ON_ERROR);

    // never throw out exception that could abort region server
    if (copAbortOnError) {
        try {
            return doPostScannerObserver(ctxt, scan, innerScanner);
        } catch (Throwable e) {
            LOG.error("Kylin Coprocessor Error", e);
            return innerScanner;
        }
    } else {
        return doPostScannerObserver(ctxt, scan, innerScanner);
    }
}
 
源代码2 项目: geowave   文件: ServerSideOperationsObserver.java
@Override
public RegionScanner postScannerOpen(
    final ObserverContext<RegionCoprocessorEnvironment> e,
    final Scan scan,
    final RegionScanner s) throws IOException {
  if (opStore == null) {
    return super.postScannerOpen(e, scan, s);
  }
  return super.postScannerOpen(
      e,
      scan,
      wrapScannerWithOps(
          e.getEnvironment().getRegionInfo().getTable(),
          s,
          scan,
          ServerOpScope.SCAN,
          REGION_SCANNER_FACTORY));
}
 
源代码3 项目: phoenix   文件: IndexRebuildRegionScanner.java
private void setReturnCodeForSingleRowRebuild() throws IOException {
    try (RegionScanner scanner = region.getScanner(scan)) {
        List<Cell> row = new ArrayList<>();
        scanner.next(row);
        // Check if the data table row we have just scanned matches with the index row key.
        // If not, there is no need to build the index row from this data table row,
        // and just return zero row count.
        if (row.isEmpty()) {
            singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
        } else {
            Put put = new Put(CellUtil.cloneRow(row.get(0)));
            for (Cell cell : row) {
                put.add(cell);
            }
            if (checkIndexRow(indexRowKey, put)) {
                singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
            } else {
                singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
            }
        }
    }
}
 
源代码4 项目: geowave   文件: ServerSideOperationsObserver.java
@Override
public RegionScanner preScannerOpen(
    final ObserverContext<RegionCoprocessorEnvironment> e,
    final Scan scan,
    final RegionScanner s) throws IOException {
  if (opStore != null) {
    final TableName tableName = e.getEnvironment().getRegionInfo().getTable();
    if (!tableName.isSystemTable()) {
      final String namespace = tableName.getNamespaceAsString();
      final String qualifier = tableName.getQualifierAsString();
      final Collection<HBaseServerOp> serverOps =
          opStore.getOperations(namespace, qualifier, ServerOpScope.SCAN);
      for (final HBaseServerOp op : serverOps) {
        op.preScannerOpen(scan);
      }
    }
  }
  return super.preScannerOpen(e, scan, s);
}
 
源代码5 项目: kylin-on-parquet-v2   文件: CubeVisitService.java
public InnerScannerAsIterator(RegionScanner regionScanner) {
    this.regionScanner = regionScanner;

    try {
        hasMore = regionScanner.nextRaw(nextOne);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
源代码6 项目: phoenix   文件: HashJoinRegionScanner.java
@SuppressWarnings("unchecked")
public HashJoinRegionScanner(RegionScanner scanner, ScanProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
    assert (projector != null);
    this.scanner = scanner;
    this.projector = projector;
    this.joinInfo = joinInfo;
    this.resultQueue = new LinkedList<ProjectedValueTuple>();
    this.hasMore = true;
    if (joinInfo != null) {
        for (JoinType type : joinInfo.getJoinTypes()) {
            if (type != JoinType.Inner && type != JoinType.Left)
                throw new IOException("Got join type '" + type + "'. Expect only INNER or LEFT with hash-joins.");
        }
        int count = joinInfo.getJoinIds().length;
        this.tempTuples = new List[count];
        this.tempDestBitSet = ValueBitSet.newInstance(joinInfo.getJoinedSchema());
        this.hashCaches = new HashCache[count];
        this.tempSrcBitSet = new ValueBitSet[count];
        TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
        for (int i = 0; i < count; i++) {
            ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
            HashCache hashCache = (HashCache)cache.getServerCache(joinId);
            if (hashCache == null)
                throw new IOException("Could not find hash cache for joinId: " + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()));
            hashCaches[i] = hashCache;
            tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
        }
        this.projector.setValueBitSet(tempDestBitSet);
    }
}
 
源代码7 项目: spliceengine   文件: RegionTxnStore.java
public Source<TxnMessage.Txn> getAllTxns(long minTs,long maxTs) throws IOException{
    if(LOG.isTraceEnabled())
        SpliceLogUtils.trace(LOG,"getAllTxns minTs=%d, maxTs=%s",minTs,maxTs);
    Scan scan=setupScanOnRange(minTs,maxTs);

    RegionScanner scanner=region.getScanner(scan);

    return new ScanIterator(scanner);
}
 
源代码8 项目: phoenix-tephra   文件: TransactionProcessor.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
  throws IOException {
  Transaction tx = getFromOperation(scan);
  if (tx != null) {
    projectFamilyDeletes(scan);
    scan.setMaxVersions();
    scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
                      TxUtils.getMaxVisibleTimestamp(tx));
    Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
    scan.setFilter(newFilter);
  }
  return s;
}
 
源代码9 项目: phoenix-tephra   文件: TransactionProcessorTest.java
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
  try (RegionScanner regionScanner = region.getScanner(scan)) {
    List<Cell> results = Lists.newArrayList();
    while (regionScanner.next(results)) { }
    assertEquals(expected, results);
  }
}
 
源代码10 项目: phoenix-tephra   文件: TransactionProcessor.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
  throws IOException {
  Transaction tx = getFromOperation(scan);
  if (tx != null) {
    projectFamilyDeletes(scan);
    scan.setMaxVersions();
    scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
                      TxUtils.getMaxVisibleTimestamp(tx));
    Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
    scan.setFilter(newFilter);
  }
  return s;
}
 
源代码11 项目: phoenix   文件: GlobalIndexChecker.java
public GlobalIndexScanner(RegionCoprocessorEnvironment env,
                          Scan scan,
                          RegionScanner scanner,
                          GlobalIndexCheckerSource metricsSource) throws IOException {
    this.env = env;
    this.scan = scan;
    this.scanner = scanner;
    this.metricsSource = metricsSource;

    region = env.getRegion();
    emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
    emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
    ageThreshold = env.getConfiguration().getLong(
            QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
            QueryServicesOptions.DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS);
    minTimestamp = scan.getTimeRange().getMin();
    maxTimestamp = scan.getTimeRange().getMax();
    byte[] indexTableName = region.getRegionInfo().getTable().getName();
    byte[] md = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
    List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(md, true);
    indexMaintainer = getIndexMaintainer(maintainers, indexTableName);
    if (indexMaintainer == null) {
        throw new DoNotRetryIOException(
                "repairIndexRows: IndexMaintainer is not included in scan attributes for " +
                        region.getRegionInfo().getTable().getNameAsString());
    }
}
 
源代码12 项目: phoenix-tephra   文件: TransactionProcessorTest.java
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
  try (RegionScanner regionScanner = region.getScanner(scan)) {
    List<Cell> results = Lists.newArrayList();
    while (regionScanner.next(results)) { }
    assertEquals(expected, results);
  }
}
 
源代码13 项目: phoenix-tephra   文件: TransactionProcessorTest.java
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
  try (RegionScanner regionScanner = region.getScanner(scan)) {
    List<Cell> results = Lists.newArrayList();
    while (regionScanner.next(results)) { }
    assertEquals(expected, results);
  }
}
 
源代码14 项目: phoenix-tephra   文件: TransactionProcessor.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
  throws IOException {
  Transaction tx = getFromOperation(scan);
  if (tx != null) {
    projectFamilyDeletes(scan);
    scan.setMaxVersions();
    scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
                      TxUtils.getMaxVisibleTimestamp(tx));
    Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
    scan.setFilter(newFilter);
  }
  return s;
}
 
源代码15 项目: phoenix-tephra   文件: TransactionProcessor.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
  throws IOException {
  Transaction tx = getFromOperation(scan);
  if (tx != null) {
    projectFamilyDeletes(scan);
    scan.setMaxVersions();
    scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
                      TxUtils.getMaxVisibleTimestamp(tx));
    Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
    scan.setFilter(newFilter);
  }
  return s;
}
 
源代码16 项目: phoenix   文件: BaseScannerRegionObserver.java
/**
 * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
 * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
 * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
 * the same from a custom filter.
 * @param offset starting position in the rowkey.
 * @param scan
 * @param tupleProjector
 * @param dataRegion
 * @param indexMaintainer
 * @param viewConstants
 */
RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
        final RegionScanner s, final int offset, final Scan scan,
        final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
        final Region dataRegion, final IndexMaintainer indexMaintainer,
        final byte[][] viewConstants, final TupleProjector projector,
        final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {

    RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());

    return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector,
            dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
 
源代码17 项目: phoenix-tephra   文件: TransactionProcessorTest.java
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
  try (RegionScanner regionScanner = region.getScanner(scan)) {
    List<Cell> results = Lists.newArrayList();
    while (regionScanner.next(results)) { }
    assertEquals(expected, results);
  }
}
 
源代码18 项目: phoenix-tephra   文件: TransactionProcessor.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
  throws IOException {
  Transaction tx = getFromOperation(scan);
  if (tx != null) {
    projectFamilyDeletes(scan);
    scan.setMaxVersions();
    scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
                      TxUtils.getMaxVisibleTimestamp(tx));
    Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
    scan.setFilter(newFilter);
  }
  return s;
}
 
源代码19 项目: Kylin   文件: AggregateRegionObserver.java
private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
    byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE);
    if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) {
        return innerScanner;
    }

    byte[] typeBytes = scan.getAttribute(TYPE);
    CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes);

    byte[] projectorBytes = scan.getAttribute(PROJECTOR);
    CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes);

    byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS);
    ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes);

    byte[] filterBytes = scan.getAttribute(FILTER);
    CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);

    // start/end region operation & sync on scanner is suggested by the
    // javadoc of RegionScanner.nextRaw()
    // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
    HRegion region = ctxt.getEnvironment().getRegion();
    region.startRegionOperation();
    try {
        synchronized (innerScanner) {
            return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
        }
    } finally {
        region.closeRegionOperation();
    }

}
 
源代码20 项目: phoenix   文件: UngroupedAggregateRegionObserver.java
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
        throws IOException {
    s = super.preScannerOpen(e, scan, s);
    if (ScanUtil.isAnalyzeTable(scan)) {
        // We are setting the start row and stop row such that it covers the entire region. As part
        // of Phonenix-1263 we are storing the guideposts against the physical table rather than 
        // individual tenant specific tables.
        scan.setStartRow(HConstants.EMPTY_START_ROW);
        scan.setStopRow(HConstants.EMPTY_END_ROW);
        scan.setFilter(null);
    }
    return s;
}
 
源代码21 项目: phoenix   文件: SpillableGroupByCache.java
@Override
public RegionScanner getScanner(final RegionScanner s) {
    final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();

    // scanner using the spillable implementation
    return new BaseRegionScanner(s) {
        @Override
        public void close() throws IOException {
            try {
                s.close();
            } finally {
                // Always close gbCache and swallow possible Exceptions
                Closeables.closeQuietly(SpillableGroupByCache.this);
            }
        }

        @Override
        public boolean next(List<Cell> results) throws IOException {
            if (!cacheIter.hasNext()) {
                return false;
            }
            Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
            ImmutableBytesWritable key = ce.getKey();
            Aggregator[] aggs = ce.getValue();
            byte[] value = aggregators.toBytes(aggs);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Adding new distinct group: "
                        + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) +
                        " with aggregators " + aggs.toString() + " value = " +
                        Bytes.toStringBinary(value));
            }
            results.add(PhoenixKeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
                    SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
            return cacheIter.hasNext();
        }
    };
}
 
源代码22 项目: phoenix   文件: PhoenixIndexBuilder.java
@Override
public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
    // The entire purpose of this method impl is to get the existing rows for the
    // table rows being indexed into the block cache, as the index maintenance code
    // does a point scan per row
    List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
    List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
    for (int i = 0; i < miniBatchOp.size(); i++) {
        Mutation m = miniBatchOp.getOperation(i).getFirst();
        keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
        maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
    }
    Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
    ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
    scanRanges.setScanStartStopRow(scan);
    scan.setFilter(scanRanges.getSkipScanFilter());
    HRegion region = this.env.getRegion();
    RegionScanner scanner = region.getScanner(scan);
    // Run through the scanner using internal nextRaw method
    MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
    region.startRegionOperation();
    try {
        boolean hasMore;
        do {
            List<KeyValue> results = Lists.newArrayList();
            // Results are potentially returned even when the return value of s.next is false
            // since this is an indication of whether or not there are more values after the
            // ones returned
            hasMore = scanner.nextRaw(results, null);
        } while (hasMore);
    } finally {
        try {
            scanner.close();
        } finally {
            region.closeRegionOperation();
        }
    }
}
 
源代码23 项目: hbase   文件: Export.java
RegionScanner checkScannerOpen(final Scan scan) throws IOException {
  RegionScanner scanner;
  if (region.getCoprocessorHost() == null) {
    scanner = region.getScanner(scan);
  } else {
    region.getCoprocessorHost().preScannerOpen(scan);
    scanner = region.getScanner(scan);
    scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
  }
  if (scanner == null) {
    throw new IOException("Failed to open region scanner");
  }
  return scanner;
}
 
源代码24 项目: hbase   文件: VisibilityController.java
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
    final Scan scan, final RegionScanner s) throws IOException {
  User user = VisibilityUtils.getActiveUser();
  if (user != null && user.getShortName() != null) {
    scannerOwners.put(s, user.getShortName());
  }
  return s;
}
 
源代码25 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list
 * of expressions from the scan and returns the aggregated rows of each group. For example,
 * given the following original rows in the RegionScanner: KEY COL1 row1 a row2 b row3 a row4 a
 * the following rows will be returned for COUNT(*): KEY COUNT a 3 b 1 The client is required to
 * do a sort and a final aggregation, since multiple rows with the same key may be returned from
 * different regions.
 */
@Override
protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
        Scan scan, RegionScanner s) throws IOException {
    boolean keyOrdered = false;
    byte[] expressionBytes = scan.getAttribute(UNORDERED_GROUP_BY_EXPRESSIONS);

    if (expressionBytes == null) {
        expressionBytes = scan.getAttribute(KEY_ORDERED_GROUP_BY_EXPRESSIONS);
        if (expressionBytes == null) {
            return s;
        }
        keyOrdered = true;
    }
    List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);

    ServerAggregators aggregators =
            ServerAggregators.deserialize(scan
                    .getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c
                    .getEnvironment().getConfiguration());

    final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
    final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
    RegionScanner innerScanner = s;
    if (p != null || j != null) {
        innerScanner =
                new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan),
                        c.getEnvironment());
    }

    if (keyOrdered) { // Optimize by taking advantage that the rows are
                      // already in the required group by key order
        return scanOrdered(c, scan, innerScanner, expressions, aggregators);
    } else { // Otherwse, collect them all up in an in memory map
        return scanUnordered(c, scan, innerScanner, expressions, aggregators);
    }
}
 
源代码26 项目: spliceengine   文件: SplitRegionScanner.java
public void registerRegionScanner(RegionScanner regionScanner) {
    if (LOG.isTraceEnabled())
        SpliceLogUtils.trace(LOG, "registerRegionScanner %s", regionScanner);
    if (currentScanner == null)
        currentScanner = regionScanner;
    regionScanners.add(regionScanner);
}
 
源代码27 项目: hbase   文件: AccessController.java
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
    final Scan scan, final RegionScanner s) throws IOException {
  User user = getActiveUser(c);
  if (user != null && user.getShortName() != null) {
    // store reference to scanner owner for later checks
    scannerOwners.put(s, user.getShortName());
  }
  return s;
}
 
源代码28 项目: hbase   文件: RegionProcedureStore.java
@Override
public void load(ProcedureLoader loader) throws IOException {
  List<ProcedureProtos.Procedure> procs = new ArrayList<>();
  long maxProcId = 0;

  try (RegionScanner scanner =
    region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
    List<Cell> cells = new ArrayList<>();
    boolean moreRows;
    do {
      moreRows = scanner.next(cells);
      if (cells.isEmpty()) {
        continue;
      }
      Cell cell = cells.get(0);
      cells.clear();
      maxProcId = Math.max(maxProcId,
        Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
      if (cell.getValueLength() > 0) {
        ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
          .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
        procs.add(proto);
      }
    } while (moreRows);
  }
  loader.setMaxProcId(maxProcId);
  ProcedureTree tree = ProcedureTree.build(procs);
  loader.load(tree.getValidProcs());
  loader.handleCorrupted(tree.getCorruptedProcs());
}
 
源代码29 项目: hbase   文件: RegionProcedureStore.java
@Override
public void cleanup() {
  // actually delete the procedures if it is not the one with the max procedure id.
  List<Cell> cells = new ArrayList<Cell>();
  try (RegionScanner scanner =
    region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
    // skip the row with max procedure id
    boolean moreRows = scanner.next(cells);
    if (cells.isEmpty()) {
      return;
    }
    cells.clear();
    while (moreRows) {
      moreRows = scanner.next(cells);
      if (cells.isEmpty()) {
        continue;
      }
      Cell cell = cells.get(0);
      cells.clear();
      if (cell.getValueLength() == 0) {
        region.update(r -> r
          .delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
      }
    }
  } catch (IOException e) {
    LOG.warn("Failed to clean up delete procedures", e);
  }
}
 
/**
 * Get a transactional scanner.
 */
public RegionScanner getScanner(final long transactionId, final Scan scan) throws IOException {
    checkClosing();

    TransactionState state = getTransactionState(transactionId);
    state.addScan(scan);
    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
    scanners.add(state.getScanner(scan));
    return super.getScanner(wrapWithDeleteFilter(scan, state), scanners);
}
 
 类所在包
 同包方法