下面列出了org.apache.hadoop.hbase.regionserver.RegionScanner#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected List<List<Cell>> getExistingLabelsWithAuths() throws IOException {
Scan scan = new Scan();
RegionScanner scanner = labelsRegion.getScanner(scan);
List<List<Cell>> existingLabels = new ArrayList<>();
try {
while (true) {
List<Cell> cells = new ArrayList<>();
scanner.next(cells);
if (cells.isEmpty()) {
break;
}
existingLabels.add(cells);
}
} finally {
scanner.close();
}
return existingLabels;
}
private void initiateScan(HRegion region) throws IOException {
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner resScanner = null;
try {
resScanner = region.getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean next = resScanner.next(results);
try {
counter.incrementAndGet();
latch.await();
} catch (InterruptedException e) {
}
while (next) {
next = resScanner.next(results);
}
} finally {
scanCompletedCounter.incrementAndGet();
resScanner.close();
}
}
private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
long clientTimeStamp) throws IOException, SQLException {
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
try {
PTable oldTable = metaDataCache.getIfPresent(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
PTable newTable;
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
if (newTable == null) {
return null;
}
if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
if (logger.isDebugEnabled()) {
logger.debug("Caching table "
+ Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
cacheKey.getLength()) + " at seqNum "
+ newTable.getSequenceNumber() + " with newer timestamp "
+ newTable.getTimeStamp() + " versus " + tableTimeStamp);
}
metaDataCache.put(cacheKey, newTable);
}
return newTable;
} finally {
scanner.close();
}
}
@Override
public List<String> getUserAuths(byte[] user, boolean systemCall)
throws IOException {
assert (labelsRegion != null || systemCall);
if (systemCall || labelsRegion == null) {
return this.labelsCache.getUserAuths(Bytes.toString(user));
}
Scan s = new Scan();
if (user != null && user.length > 0) {
s.addColumn(LABELS_TABLE_FAMILY, user);
}
Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
new Authorizations(SYSTEM_LABEL));
s.setFilter(filter);
ArrayList<String> auths = new ArrayList<>();
RegionScanner scanner = this.labelsRegion.getScanner(s);
try {
List<Cell> results = new ArrayList<>(1);
while (true) {
scanner.next(results);
if (results.isEmpty()) break;
Cell cell = results.get(0);
int ordinal = PrivateCellUtil.getRowAsInt(cell);
String label = this.labelsCache.getLabel(ordinal);
if (label != null) {
auths.add(label);
}
results.clear();
}
} finally {
scanner.close();
}
return auths;
}
@Override
public List<String> getGroupAuths(String[] groups, boolean systemCall)
throws IOException {
assert (labelsRegion != null || systemCall);
if (systemCall || labelsRegion == null) {
return this.labelsCache.getGroupAuths(groups);
}
Scan s = new Scan();
if (groups != null && groups.length > 0) {
for (String group : groups) {
s.addColumn(LABELS_TABLE_FAMILY, Bytes.toBytes(AuthUtil.toGroupEntry(group)));
}
}
Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
new Authorizations(SYSTEM_LABEL));
s.setFilter(filter);
Set<String> auths = new HashSet<>();
RegionScanner scanner = this.labelsRegion.getScanner(s);
try {
List<Cell> results = new ArrayList<>(1);
while (true) {
scanner.next(results);
if (results.isEmpty()) break;
Cell cell = results.get(0);
int ordinal = PrivateCellUtil.getRowAsInt(cell);
String label = this.labelsCache.getLabel(ordinal);
if (label != null) {
auths.add(label);
}
results.clear();
}
} finally {
scanner.close();
}
return new ArrayList<>(auths);
}
@Test
public void testStoreFileMissing() throws Exception {
// Write 3 records and create 3 store files.
write("row1");
region.flush(true);
write("row2");
region.flush(true);
write("row3");
region.flush(true);
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<Cell>();
// Read first item
scanner.next(res);
assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
res.clear();
// Create a new file in between scan nexts
write("row4");
region.flush(true);
// Compact the table
region.compact(true);
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
cleaner.chore();
// This issues scan next
scanner.next(res);
assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
scanner.close();
}
private void runScanner(Table hTable, int expectedSize, Filter filter) throws IOException {
String cf = "f";
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(cf));
scan.setFilter(filter);
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TableName.valueOf(table));
HRegion first = regions.get(0);
first.getScanner(scan);
RegionScanner scanner = first.getScanner(scan);
List<Cell> results = new ArrayList<>();
// Result result;
long timeBeforeScan = System.currentTimeMillis();
int found = 0;
while (scanner.next(results)) {
found += results.size();
results.clear();
}
found += results.size();
long scanTime = System.currentTimeMillis() - timeBeforeScan;
scanner.close();
LOG.info("\nscan time = " + scanTime + "ms");
LOG.info("found " + found + " results\n");
assertEquals(expectedSize, found);
}
@Test
public void testNoMeasure() throws IOException {
CoprocessorRowType rowType = newRowType();
CoprocessorProjector projector = new CoprocessorProjector(mask);
ObserverAggregators aggregators = new ObserverAggregators(new HCol[] {});
CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
// always-true,
// filter
HashSet<String> expectedResult = new HashSet<String>();
expectedResult.add("\\x02\\x02\\x00\\x00");
expectedResult.add("\\x01\\x01\\x00\\x00");
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
result.clear();
hasMore = aggrScanner.next(result);
if (result.isEmpty())
continue;
Cell cell = result.get(0);
String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
assertTrue(expectedResult.contains(rowKey));
}
aggrScanner.close();
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
// scanner using the spillable implementation
return new BaseRegionScanner(s) {
@Override
public void close() throws IOException {
try {
s.close();
} finally {
// Always close gbCache and swallow possible Exceptions
Closeables.closeQuietly(SpillableGroupByCache.this);
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (!cacheIter.hasNext()) {
return false;
}
Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
ImmutableBytesWritable key = ce.getKey();
Aggregator[] aggs = ce.getValue();
byte[] value = aggregators.toBytes(aggs);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) +
" with aggregators " + aggs.toString() + " value = " +
Bytes.toStringBinary(value));
}
results.add(PhoenixKeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return cacheIter.hasNext();
}
};
}
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE;
if(oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
return new IndexerRegionScanner(innerScanner, region, scan, env);
}
if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
// For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
// This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
// For rebuilds we need all columns and all versions
if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
rawScan.setFilter(null);
} else if (scan.getFilter() != null) {
// Override the filter so that we get all versions
rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
}
rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
}
innerScanner.close();
RegionScanner scanner = region.getScanner(rawScan);
return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
}
return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
}
@Override
public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
// The entire purpose of this method impl is to get the existing rows for the
// table rows being indexed into the block cache, as the index maintenance code
// does a point scan per row
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i).getFirst();
keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
}
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
scanRanges.setScanStartStopRow(scan);
scan.setFilter(scanRanges.getSkipScanFilter());
HRegion region = this.env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation();
try {
boolean hasMore;
do {
List<KeyValue> results = Lists.newArrayList();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = scanner.nextRaw(results, null);
} while (hasMore);
} finally {
try {
scanner.close();
} finally {
region.closeRegionOperation();
}
}
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
// scanner using the spillable implementation
return new BaseRegionScanner() {
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
try {
s.close();
} finally {
// Always close gbCache and swallow possible Exceptions
Closeables.closeQuietly(SpillableGroupByCache.this);
}
}
@Override
public boolean next(List<KeyValue> results) throws IOException {
if (!cacheIter.hasNext()) { return false; }
Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
ImmutableBytesWritable key = ce.getKey();
Aggregator[] aggs = ce.getValue();
byte[] value = aggregators.toBytes(aggs);
if (logger.isDebugEnabled()) {
logger.debug("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators "
+ aggs.toString() + " value = " + Bytes.toStringBinary(value));
}
results.add(KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return cacheIter.hasNext();
}
};
}
private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException, SQLException {
Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
RegionScanner scanner = region.getScanner(scan);
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
try {
PTable oldTable = metaDataCache.get(cacheKey);
long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
PTable newTable;
newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
if (newTable == null) {
return null;
}
if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
if (logger.isDebugEnabled()) {
logger.debug("Caching table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " at seqNum " + newTable.getSequenceNumber() + " with newer timestamp " + newTable.getTimeStamp() + " versus " + tableTimeStamp);
}
oldTable = metaDataCache.put(cacheKey, newTable);
if (logger.isDebugEnabled()) {
if (oldTable == null) {
logger.debug("No previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()));
} else {
logger.debug("Previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " was at seqNum " + oldTable.getSequenceNumber() + " with timestamp " + oldTable.getTimeStamp());
}
}
}
return newTable;
} finally {
scanner.close();
}
}
@Override
public void close() throws IOException {
String lastRow = lastCell != null ? CellUtil.getCellKeyAsString(lastCell) : null;
if (LOG.isDebugEnabled())
LOG.debug(String.format("close split scanner with table [%s], scan [%s] with rowCount=%d, reinitCount=%d, scannerExceptionCount=%d, lastRows=%s",htable.getName().toString(),initialScan,totalScannerCount,reInitCount,scanExceptionCount,lastRow));
closed = true;
for (RegionScanner rs : regionScanners) {
rs.close();
}
regionScanners.clear();
clientPartition.close();
currentScanner = null;
}
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators) {
if (logger.isDebugEnabled()) {
logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators);
}
return new BaseRegionScanner() {
private ImmutableBytesWritable currentKey = null;
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
s.close();
}
@Override
public boolean next(List<KeyValue> results) throws IOException {
boolean hasMore;
boolean aggBoundary = false;
MultiKeyValueTuple result = new MultiKeyValueTuple();
ImmutableBytesWritable key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
HRegion region = c.getEnvironment().getRegion();
MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
region.startRegionOperation();
try {
do {
List<KeyValue> kvs = new ArrayList<KeyValue>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
// are more values after the
// ones returned
hasMore = s.nextRaw(kvs, null);
if (!kvs.isEmpty()) {
result.setKeyValues(kvs);
key = TupleUtil.getConcatenatedValue(result, expressions);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
if (!aggBoundary) {
aggregators.aggregate(rowAggregators, result);
if (logger.isDebugEnabled()) {
logger.debug("Row passed filters: " + kvs
+ ", aggregated values: "
+ Arrays.asList(rowAggregators));
}
currentKey = key;
}
}
} while (hasMore && !aggBoundary);
} finally {
region.closeRegionOperation();
}
if (currentKey != null) {
byte[] value = aggregators.toBytes(rowAggregators);
KeyValue keyValue =
KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
results.add(keyValue);
if (logger.isDebugEnabled()) {
logger.debug("Adding new aggregate row: "
+ keyValue
+ ",for current key "
+ Bytes.toStringBinary(currentKey.get(), currentKey.getOffset(),
currentKey.getLength()) + ", aggregated values: "
+ Arrays.asList(rowAggregators));
}
// If we're at an aggregation boundary, reset the
// aggregators and
// aggregate with the current result (which is not a part of
// the returned result).
if (aggBoundary) {
aggregators.reset(rowAggregators);
aggregators.aggregate(rowAggregators, result);
currentKey = key;
}
}
// Continue if there are more
if (hasMore || aggBoundary) {
return true;
}
currentKey = null;
return false;
}
};
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator();
// scanner using the spillable implementation
return new BaseRegionScanner() {
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
try {
s.close();
} finally {
// Always close gbCache and swallow possible Exceptions
Closeables.closeQuietly(SpillableGroupByCache.this);
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (!cacheIter.hasNext()) { return false; }
Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next();
ImmutableBytesWritable key = ce.getKey();
Aggregator[] aggs = ce.getValue();
byte[] value = aggregators.toBytes(aggs);
if (logger.isDebugEnabled()) {
logger.debug("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators "
+ aggs.toString() + " value = " + Bytes.toStringBinary(value));
}
results.add(KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
return cacheIter.hasNext();
}
@Override
public long getMaxResultSize() {
return s.getMaxResultSize();
}
};
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
// Compute final allocation
long estSize = sizeOfUnorderedGroupByMap(aggregateMap.size(), aggregators.getEstimatedByteSize());
chunk.resize(estSize);
final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size());
final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter =
aggregateMap.entrySet().iterator();
while (cacheIter.hasNext()) {
Map.Entry<ImmutableBytesPtr, Aggregator[]> entry = cacheIter.next();
ImmutableBytesPtr key = entry.getKey();
Aggregator[] rowAggregators = entry.getValue();
// Generate byte array of Aggregators and set as value of row
byte[] value = aggregators.toBytes(rowAggregators);
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength())
+ " with aggregators " + Arrays.asList(rowAggregators).toString()
+ " value = " + Bytes.toStringBinary(value), customAnnotations));
}
KeyValue keyValue =
KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(),
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0,
value.length);
aggResults.add(keyValue);
}
// scanner using the non spillable, memory-only implementation
return new BaseRegionScanner() {
private int index = 0;
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
try {
s.close();
} finally {
InMemoryGroupByCache.this.close();
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (index >= aggResults.size()) return false;
results.add(aggResults.get(index));
index++;
return index < aggResults.size();
}
@Override
public long getMaxResultSize() {
return s.getMaxResultSize();
}
};
}
@Test
public void test() throws IOException {
CoprocessorRowType rowType = newRowType();
CoprocessorProjector projector = new CoprocessorProjector(mask);
ObserverAggregators aggregators = new ObserverAggregators(new HCol[] { c1, c2 });
CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
// always-true,
// filter
HashSet<String> expectedResult = new HashSet<String>();
expectedResult.add("\\x02\\x02\\x00\\x00, f:q1, [26.0, 7]");
expectedResult.add("\\x02\\x02\\x00\\x00, f:q2, [48.0]");
expectedResult.add("\\x01\\x01\\x00\\x00, f:q1, [22.0, 3]");
expectedResult.add("\\x01\\x01\\x00\\x00, f:q2, [44.0]");
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
result.clear();
hasMore = aggrScanner.next(result);
if (result.isEmpty())
continue;
Cell cell = result.get(0);
HCol hcol = null;
if (ObserverAggregators.match(c1, cell)) {
hcol = c1;
} else if (ObserverAggregators.match(c2, cell)) {
hcol = c2;
} else
fail();
hcol.measureCodec.decode(ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()), hcol.measureValues);
String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
String col = Bytes.toString(hcol.family) + ":" + Bytes.toString(hcol.qualifier);
String values = Arrays.toString(hcol.measureValues);
System.out.println(rowKey);
System.out.println(col);
System.out.println(values);
assertTrue(expectedResult.contains(rowKey + ", " + col + ", " + values));
}
aggrScanner.close();
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
// Compute final allocation
long estSize = sizeOfUnorderedGroupByMap(aggregateMap.size(), aggregators.getEstimatedByteSize());
chunk.resize(estSize);
final List<Cell> aggResults = new ArrayList<Cell>(aggregateMap.size());
final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter =
aggregateMap.entrySet().iterator();
while (cacheIter.hasNext()) {
Map.Entry<ImmutableBytesPtr, Aggregator[]> entry = cacheIter.next();
ImmutableBytesPtr key = entry.getKey();
Aggregator[] rowAggregators = entry.getValue();
// Generate byte array of Aggregators and set as value of row
byte[] value = aggregators.toBytes(rowAggregators);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength())
+ " with aggregators " + Arrays.asList(rowAggregators).toString()
+ " value = " + Bytes.toStringBinary(value), customAnnotations));
}
Cell keyValue =
PhoenixKeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(),
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0,
value.length);
aggResults.add(keyValue);
}
// scanner using the non spillable, memory-only implementation
return new BaseRegionScanner(s) {
private int index = 0;
@Override
public void close() throws IOException {
try {
s.close();
} finally {
InMemoryGroupByCache.this.close();
}
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (index >= aggResults.size()) {
return false;
}
results.add(aggResults.get(index));
index++;
return index < aggResults.size();
}
};
}
@Override
public RegionScanner getScanner(final RegionScanner s) {
// Compute final allocation
int estSize = sizeOfUnorderedGroupByMap(aggregateMap.size(), aggregators.getEstimatedByteSize());
chunk.resize(estSize);
final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size());
final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter =
aggregateMap.entrySet().iterator();
while (cacheIter.hasNext()) {
Map.Entry<ImmutableBytesPtr, Aggregator[]> entry = cacheIter.next();
ImmutableBytesPtr key = entry.getKey();
Aggregator[] rowAggregators = entry.getValue();
// Generate byte array of Aggregators and set as value of row
byte[] value = aggregators.toBytes(rowAggregators);
if (logger.isDebugEnabled()) {
logger.debug("Adding new distinct group: "
+ Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength())
+ " with aggregators " + Arrays.asList(rowAggregators).toString()
+ " value = " + Bytes.toStringBinary(value));
}
KeyValue keyValue =
KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(),
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0,
value.length);
aggResults.add(keyValue);
}
// scanner using the non spillable, memory-only implementation
return new BaseRegionScanner() {
private int index = 0;
@Override
public HRegionInfo getRegionInfo() {
return s.getRegionInfo();
}
@Override
public void close() throws IOException {
try {
s.close();
} finally {
InMemoryGroupByCache.this.close();
}
}
@Override
public boolean next(List<KeyValue> results) throws IOException {
if (index >= aggResults.size()) return false;
results.add(aggResults.get(index));
index++;
return index < aggResults.size();
}
};
}