下面列出了怎么用org.apache.hadoop.hbase.regionserver.RegionScanner的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
boolean copAbortOnError = ctxt.getEnvironment().getConfiguration().getBoolean(RegionCoprocessorHost.ABORT_ON_ERROR_KEY, RegionCoprocessorHost.DEFAULT_ABORT_ON_ERROR);
// never throw out exception that could abort region server
if (copAbortOnError) {
try {
return doPostScannerObserver(ctxt, scan, innerScanner);
} catch (Throwable e) {
LOG.error("Kylin Coprocessor Error", e);
return innerScanner;
}
} else {
return doPostScannerObserver(ctxt, scan, innerScanner);
}
}
@Override
public RegionScanner postScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan,
final RegionScanner s) throws IOException {
if (opStore == null) {
return super.postScannerOpen(e, scan, s);
}
return super.postScannerOpen(
e,
scan,
wrapScannerWithOps(
e.getEnvironment().getRegionInfo().getTable(),
s,
scan,
ServerOpScope.SCAN,
REGION_SCANNER_FACTORY));
}
private void setReturnCodeForSingleRowRebuild() throws IOException {
try (RegionScanner scanner = region.getScanner(scan)) {
List<Cell> row = new ArrayList<>();
scanner.next(row);
// Check if the data table row we have just scanned matches with the index row key.
// If not, there is no need to build the index row from this data table row,
// and just return zero row count.
if (row.isEmpty()) {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
} else {
Put put = new Put(CellUtil.cloneRow(row.get(0)));
for (Cell cell : row) {
put.add(cell);
}
if (checkIndexRow(indexRowKey, put)) {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
} else {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
}
}
}
}
@Override
public RegionScanner preScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan,
final RegionScanner s) throws IOException {
if (opStore != null) {
final TableName tableName = e.getEnvironment().getRegionInfo().getTable();
if (!tableName.isSystemTable()) {
final String namespace = tableName.getNamespaceAsString();
final String qualifier = tableName.getQualifierAsString();
final Collection<HBaseServerOp> serverOps =
opStore.getOperations(namespace, qualifier, ServerOpScope.SCAN);
for (final HBaseServerOp op : serverOps) {
op.preScannerOpen(scan);
}
}
}
return super.preScannerOpen(e, scan, s);
}
public InnerScannerAsIterator(RegionScanner regionScanner) {
this.regionScanner = regionScanner;
try {
hasMore = regionScanner.nextRaw(nextOne);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public HashJoinRegionScanner(RegionScanner scanner, ScanProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
assert (projector != null);
this.scanner = scanner;
this.projector = projector;
this.joinInfo = joinInfo;
this.resultQueue = new LinkedList<ProjectedValueTuple>();
this.hasMore = true;
if (joinInfo != null) {
for (JoinType type : joinInfo.getJoinTypes()) {
if (type != JoinType.Inner && type != JoinType.Left)
throw new IOException("Got join type '" + type + "'. Expect only INNER or LEFT with hash-joins.");
}
int count = joinInfo.getJoinIds().length;
this.tempTuples = new List[count];
this.tempDestBitSet = ValueBitSet.newInstance(joinInfo.getJoinedSchema());
this.hashCaches = new HashCache[count];
this.tempSrcBitSet = new ValueBitSet[count];
TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
for (int i = 0; i < count; i++) {
ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
HashCache hashCache = (HashCache)cache.getServerCache(joinId);
if (hashCache == null)
throw new IOException("Could not find hash cache for joinId: " + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()));
hashCaches[i] = hashCache;
tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
}
this.projector.setValueBitSet(tempDestBitSet);
}
}
public Source<TxnMessage.Txn> getAllTxns(long minTs,long maxTs) throws IOException{
if(LOG.isTraceEnabled())
SpliceLogUtils.trace(LOG,"getAllTxns minTs=%d, maxTs=%s",minTs,maxTs);
Scan scan=setupScanOnRange(minTs,maxTs);
RegionScanner scanner=region.getScanner(scan);
return new ScanIterator(scanner);
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
Transaction tx = getFromOperation(scan);
if (tx != null) {
projectFamilyDeletes(scan);
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
}
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
try (RegionScanner regionScanner = region.getScanner(scan)) {
List<Cell> results = Lists.newArrayList();
while (regionScanner.next(results)) { }
assertEquals(expected, results);
}
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
Transaction tx = getFromOperation(scan);
if (tx != null) {
projectFamilyDeletes(scan);
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
}
public GlobalIndexScanner(RegionCoprocessorEnvironment env,
Scan scan,
RegionScanner scanner,
GlobalIndexCheckerSource metricsSource) throws IOException {
this.env = env;
this.scan = scan;
this.scanner = scanner;
this.metricsSource = metricsSource;
region = env.getRegion();
emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
ageThreshold = env.getConfiguration().getLong(
QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
QueryServicesOptions.DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS);
minTimestamp = scan.getTimeRange().getMin();
maxTimestamp = scan.getTimeRange().getMax();
byte[] indexTableName = region.getRegionInfo().getTable().getName();
byte[] md = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(md, true);
indexMaintainer = getIndexMaintainer(maintainers, indexTableName);
if (indexMaintainer == null) {
throw new DoNotRetryIOException(
"repairIndexRows: IndexMaintainer is not included in scan attributes for " +
region.getRegionInfo().getTable().getNameAsString());
}
}
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
try (RegionScanner regionScanner = region.getScanner(scan)) {
List<Cell> results = Lists.newArrayList();
while (regionScanner.next(results)) { }
assertEquals(expected, results);
}
}
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
try (RegionScanner regionScanner = region.getScanner(scan)) {
List<Cell> results = Lists.newArrayList();
while (regionScanner.next(results)) { }
assertEquals(expected, results);
}
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
Transaction tx = getFromOperation(scan);
if (tx != null) {
projectFamilyDeletes(scan);
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
Transaction tx = getFromOperation(scan);
if (tx != null) {
projectFamilyDeletes(scan);
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
}
/**
* Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
* for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
* the same from a custom filter.
* @param offset starting position in the rowkey.
* @param scan
* @param tupleProjector
* @param dataRegion
* @param indexMaintainer
* @param viewConstants
*/
RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner s, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());
return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
@SuppressWarnings("StatementWithEmptyBody")
private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception {
try (RegionScanner regionScanner = region.getScanner(scan)) {
List<Cell> results = Lists.newArrayList();
while (regionScanner.next(results)) { }
assertEquals(expected, results);
}
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
Transaction tx = getFromOperation(scan);
if (tx != null) {
projectFamilyDeletes(scan);
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
}
private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE);
if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) {
return innerScanner;
}
byte[] typeBytes = scan.getAttribute(TYPE);
CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes);
byte[] projectorBytes = scan.getAttribute(PROJECTOR);
CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes);
byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS);
ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes);
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
HRegion region = ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
}
} finally {
region.closeRegionOperation();
}
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
throws IOException {
s = super.preScannerOpen(e, scan, s);
if (ScanUtil.isAnalyzeTable(scan)) {
// We are setting the start row and stop row such that it covers the entire region. As part
// of Phonenix-1263 we are storing the guideposts against the physical table rather than
// individual tenant specific tables.
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
scan.setFilter(null);
}
return s;
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
// scanner using the spillable implementation
return new BaseRegionScanner(s) {
@Override
public void close() throws IOException {
try {
s.close();
} finally {
// Always close gbCache and swallow possible Exceptions
Closeables.closeQuietly(SpillableGroupByCache.this);
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (!cacheIter.hasNext()) {
return false;
}
Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
ImmutableBytesWritable key = ce.getKey();
Aggregator[] aggs = ce.getValue();
byte[] value = aggregators.toBytes(aggs);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) +
" with aggregators " + aggs.toString() + " value = " +
Bytes.toStringBinary(value));
}
results.add(PhoenixKeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return cacheIter.hasNext();
}
};
}
@Override
public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i).getFirst();
keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
}
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
scanRanges.setScanStartStopRow(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
HRegion region = this.env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation();
try {
boolean hasMore;
do {
List<KeyValue> results = Lists.newArrayList();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = scanner.nextRaw(results, null);
} while (hasMore);
} finally {
try {
scanner.close();
} finally {
region.closeRegionOperation();
}
}
}
RegionScanner checkScannerOpen(final Scan scan) throws IOException {
RegionScanner scanner;
if (region.getCoprocessorHost() == null) {
scanner = region.getScanner(scan);
} else {
region.getCoprocessorHost().preScannerOpen(scan);
scanner = region.getScanner(scan);
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
if (scanner == null) {
throw new IOException("Failed to open region scanner");
}
return scanner;
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s) throws IOException {
User user = VisibilityUtils.getActiveUser();
if (user != null && user.getShortName() != null) {
scannerOwners.put(s, user.getShortName());
}
return s;
}
/**
* Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list
* of expressions from the scan and returns the aggregated rows of each group. For example,
* given the following original rows in the RegionScanner: KEY COL1 row1 a row2 b row3 a row4 a
* the following rows will be returned for COUNT(*): KEY COUNT a 3 b 1 The client is required to
* do a sort and a final aggregation, since multiple rows with the same key may be returned from
* different regions.
*/
@Override
protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
boolean keyOrdered = false;
byte[] expressionBytes = scan.getAttribute(UNORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
expressionBytes = scan.getAttribute(KEY_ORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
return s;
}
keyOrdered = true;
}
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
ServerAggregators aggregators =
ServerAggregators.deserialize(scan
.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c
.getEnvironment().getConfiguration());
final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
RegionScanner innerScanner = s;
if (p != null || j != null) {
innerScanner =
new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan),
c.getEnvironment());
}
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
return scanOrdered(c, scan, innerScanner, expressions, aggregators);
} else { // Otherwse, collect them all up in an in memory map
return scanUnordered(c, scan, innerScanner, expressions, aggregators);
}
}
public void registerRegionScanner(RegionScanner regionScanner) {
if (LOG.isTraceEnabled())
SpliceLogUtils.trace(LOG, "registerRegionScanner %s", regionScanner);
if (currentScanner == null)
currentScanner = regionScanner;
regionScanners.add(regionScanner);
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s) throws IOException {
User user = getActiveUser(c);
if (user != null && user.getShortName() != null) {
// store reference to scanner owner for later checks
scannerOwners.put(s, user.getShortName());
}
return s;
}
@Override
public void load(ProcedureLoader loader) throws IOException {
List<ProcedureProtos.Procedure> procs = new ArrayList<>();
long maxProcId = 0;
try (RegionScanner scanner =
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
List<Cell> cells = new ArrayList<>();
boolean moreRows;
do {
moreRows = scanner.next(cells);
if (cells.isEmpty()) {
continue;
}
Cell cell = cells.get(0);
cells.clear();
maxProcId = Math.max(maxProcId,
Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
if (cell.getValueLength() > 0) {
ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
.parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
procs.add(proto);
}
} while (moreRows);
}
loader.setMaxProcId(maxProcId);
ProcedureTree tree = ProcedureTree.build(procs);
loader.load(tree.getValidProcs());
loader.handleCorrupted(tree.getCorruptedProcs());
}
@Override
public void cleanup() {
// actually delete the procedures if it is not the one with the max procedure id.
List<Cell> cells = new ArrayList<Cell>();
try (RegionScanner scanner =
region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
// skip the row with max procedure id
boolean moreRows = scanner.next(cells);
if (cells.isEmpty()) {
return;
}
cells.clear();
while (moreRows) {
moreRows = scanner.next(cells);
if (cells.isEmpty()) {
continue;
}
Cell cell = cells.get(0);
cells.clear();
if (cell.getValueLength() == 0) {
region.update(r -> r
.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
}
}
} catch (IOException e) {
LOG.warn("Failed to clean up delete procedures", e);
}
}
/**
* Get a transactional scanner.
*/
public RegionScanner getScanner(final long transactionId, final Scan scan) throws IOException {
checkClosing();
TransactionState state = getTransactionState(transactionId);
state.addScan(scan);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
scanners.add(state.getScanner(scan));
return super.getScanner(wrapWithDeleteFilter(scan, state), scanners);
}