下面列出了org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public InnerScannerAsIterator(RegionScanner regionScanner) {
this.regionScanner = regionScanner;
try {
hasMore = regionScanner.nextRaw(nextOne);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public InnerScannerAsIterator(RegionScanner regionScanner) {
this.regionScanner = regionScanner;
try {
hasMore = regionScanner.nextRaw(nextOne);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("rawtypes")
ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
ObserverTuple tuple = new ObserverTuple(type);
boolean hasMore = true;
List<Cell> results = new ArrayList<Cell>();
while (hasMore) {
results.clear();
hasMore = innerScanner.nextRaw(results);
if (results.isEmpty())
continue;
if (stats != null)
stats.countInputRow(results);
Cell cell = results.get(0);
tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
if (filter != null && filter.evaluate(tuple) == false)
continue;
CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
aggregators.aggregate(bufs, results);
aggCache.checkMemoryUsage();
}
return aggCache;
}
@Override
public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i).getFirst();
keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
}
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
scanRanges.setScanStartStopRow(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
HRegion region = this.env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation();
try {
boolean hasMore;
do {
List<KeyValue> results = Lists.newArrayList();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = scanner.nextRaw(results, null);
} while (hasMore);
} finally {
try {
scanner.close();
} finally {
region.closeRegionOperation();
}
}
}
@Override
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
new HashMap<ImmutableBytesWritable, IndexMaintainer>();
ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
for(IndexMaintainer indexMaintainer: indexMaintainers) {
if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
indexTableName.set(indexMaintainer.getIndexTableName());
if (maintainers.get(indexTableName) != null) continue;
maintainers.put(indexTableName, indexMaintainer);
}
}
if (maintainers.isEmpty()) return;
Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
scanRanges.initializeScan(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
HRegion region = this.env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
region.startRegionOperation();
try {
boolean hasMore;
do {
List<Cell> results = Lists.newArrayList();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = scanner.nextRaw(results);
} while (hasMore);
} finally {
try {
scanner.close();
} finally {
region.closeRegionOperation();
}
}
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
}
HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
do {
List<Cell> results = new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = s.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore && groupByCache.size() < limit);
} finally {
region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(s);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
private static ExportProtos.ExportResponse processData(final Region region,
final Configuration conf, final UserProvider userProvider, final Scan scan,
final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
ScanCoprocessor cp = new ScanCoprocessor(region);
RegionScanner scanner = null;
try (RegionOp regionOp = new RegionOp(region);
SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
scanner = cp.checkScannerOpen(scan);
ImmutableBytesWritable key = new ImmutableBytesWritable();
long rowCount = 0;
long cellCount = 0;
List<Result> results = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
do {
boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
if (bypass) {
hasMore = false;
} else {
hasMore = scanner.nextRaw(cells);
if (cells.isEmpty()) {
continue;
}
Cell firstCell = cells.get(0);
for (Cell cell : cells) {
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()) != 0) {
throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
+ " rows?? first row="
+ Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength())
+ ", current row="
+ Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
}
results.add(Result.create(cells));
cells.clear();
cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
}
for (Result r : results) {
key.set(r.getRow());
out.append(key, r);
++rowCount;
cellCount += r.size();
}
results.clear();
} while (hasMore);
return ExportProtos.ExportResponse.newBuilder()
.setRowCount(rowCount)
.setCellCount(cellCount)
.build();
} finally {
cp.checkScannerClose(scanner);
}
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators,
ScanUtil.getCustomAnnotations(scan)));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Spillable groupby enabled: " + spillableEnabled,
ScanUtil.getCustomAnnotations(scan)));
}
Region region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = scanner.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesPtr key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore && groupByCache.size() < limit);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(scanner);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
* @param limit TODO
* @throws IOException
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, final long limit) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators,
ScanUtil.getCustomAnnotations(scan)));
}
final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesPtr currentKey = null;
@Override
public boolean next(List<Cell> results) throws IOException {
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesPtr key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
// If we're calculating no aggregate functions, we can exit at the
// start of a new row. Otherwise, we have to wait until an agg
int countOffset = rowAggregators.length == 0 ? 1 : 0;
Region region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
// are more values after the
// ones returned
hasMore = scanner.nextRaw(kvs);
if (!kvs.isEmpty()) {
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
if (!aggBoundary) {
aggregators.aggregate(rowAggregators, result);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Row passed filters: " + kvs
+ ", aggregated values: "
+ Arrays.asList(rowAggregators),
ScanUtil.getCustomAnnotations(scan)));
}
currentKey = key;
}
}
atLimit = rowCount + countOffset >= limit;
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
} while (hasMore && !aggBoundary && !atLimit);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
if (currentKey != null) {
byte[] value = aggregators.toBytes(rowAggregators);
Cell keyValue =
PhoenixKeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
results.add(keyValue);
// If we're at an aggregation boundary, reset the
// aggregators and
// aggregate with the current result (which is not a part of
// the returned result).
if (aggBoundary) {
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
currentKey = key;
rowCount++;
atLimit |= rowCount >= limit;
}
}
// Continue if there are more
if (!atLimit && (hasMore || aggBoundary)) {
return true;
}
currentKey = null;
return false;
}
};
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators);
}
int estDistVals = DEFAULT_ESTIMATED_DISTINCT_VALUES;
byte[] estDistValsBytes = scan.getAttribute(ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.min(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug("Spillable groupby enabled: " + spillableEnabled);
}
HRegion region = c.getEnvironment().getRegion();
MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
region.startRegionOperation();
try {
do {
List<KeyValue> results = new ArrayList<KeyValue>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = s.nextRaw(results, null);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore);
} finally {
region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(s);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators) {
if (logger.isDebugEnabled()) {
logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators);
}
return new BaseRegionScanner() {
private ImmutableBytesWritable currentKey = null;
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
s.close();
}
@Override
public boolean next(List<KeyValue> results) throws IOException {
boolean hasMore;
boolean aggBoundary = false;
MultiKeyValueTuple result = new MultiKeyValueTuple();
ImmutableBytesWritable key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
HRegion region = c.getEnvironment().getRegion();
MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
region.startRegionOperation();
try {
do {
List<KeyValue> kvs = new ArrayList<KeyValue>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
// are more values after the
// ones returned
hasMore = s.nextRaw(kvs, null);
if (!kvs.isEmpty()) {
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
if (!aggBoundary) {
aggregators.aggregate(rowAggregators, result);
if (logger.isDebugEnabled()) {
logger.debug("Row passed filters: " + kvs
+ ", aggregated values: "
+ Arrays.asList(rowAggregators));
}
currentKey = key;
}
}
} while (hasMore && !aggBoundary);
} finally {
region.closeRegionOperation();
}
if (currentKey != null) {
byte[] value = aggregators.toBytes(rowAggregators);
KeyValue keyValue =
KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
results.add(keyValue);
if (logger.isDebugEnabled()) {
logger.debug("Adding new aggregate row: "
+ keyValue
+ ",for current key "
+ Bytes.toStringBinary(currentKey.get(), currentKey.getOffset(),
currentKey.getLength()) + ", aggregated values: "
+ Arrays.asList(rowAggregators));
}
// If we're at an aggregation boundary, reset the
// aggregators and
// aggregate with the current result (which is not a part of
// the returned result).
if (aggBoundary) {
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
currentKey = key;
}
}
// Continue if there are more
if (hasMore || aggBoundary) {
return true;
}
currentKey = null;
return false;
}
};
}