下面列出了org.apache.hadoop.hbase.regionserver.RegionScanner#next ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected List<List<Cell>> getExistingLabelsWithAuths() throws IOException {
Scan scan = new Scan();
RegionScanner scanner = labelsRegion.getScanner(scan);
List<List<Cell>> existingLabels = new ArrayList<>();
try {
while (true) {
List<Cell> cells = new ArrayList<>();
scanner.next(cells);
if (cells.isEmpty()) {
break;
}
existingLabels.add(cells);
}
} finally {
scanner.close();
}
return existingLabels;
}
private void initiateScan(HRegion region) throws IOException {
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner resScanner = null;
try {
resScanner = region.getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean next = resScanner.next(results);
try {
counter.incrementAndGet();
latch.await();
} catch (InterruptedException e) {
}
while (next) {
next = resScanner.next(results);
}
} finally {
scanCompletedCounter.incrementAndGet();
resScanner.close();
}
}
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
Scan scan = newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
RegionScanner scanner = region.getScanner(scan);
List<KeyValue> results = Lists.<KeyValue>newArrayList();
scanner.next(results);
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
KeyValue kv = results.get(0);
if (kv.isDelete()) {
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
return table;
}
}
return null;
}
@Override
public List<String> getUserAuths(byte[] user, boolean systemCall)
throws IOException {
assert (labelsRegion != null || systemCall);
if (systemCall || labelsRegion == null) {
return this.labelsCache.getUserAuths(Bytes.toString(user));
}
Scan s = new Scan();
if (user != null && user.length > 0) {
s.addColumn(LABELS_TABLE_FAMILY, user);
}
Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
new Authorizations(SYSTEM_LABEL));
s.setFilter(filter);
ArrayList<String> auths = new ArrayList<>();
RegionScanner scanner = this.labelsRegion.getScanner(s);
try {
List<Cell> results = new ArrayList<>(1);
while (true) {
scanner.next(results);
if (results.isEmpty()) break;
Cell cell = results.get(0);
int ordinal = PrivateCellUtil.getRowAsInt(cell);
String label = this.labelsCache.getLabel(ordinal);
if (label != null) {
auths.add(label);
}
results.clear();
}
} finally {
scanner.close();
}
return auths;
}
@Override
public List<String> getGroupAuths(String[] groups, boolean systemCall)
throws IOException {
assert (labelsRegion != null || systemCall);
if (systemCall || labelsRegion == null) {
return this.labelsCache.getGroupAuths(groups);
}
Scan s = new Scan();
if (groups != null && groups.length > 0) {
for (String group : groups) {
s.addColumn(LABELS_TABLE_FAMILY, Bytes.toBytes(AuthUtil.toGroupEntry(group)));
}
}
Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
new Authorizations(SYSTEM_LABEL));
s.setFilter(filter);
Set<String> auths = new HashSet<>();
RegionScanner scanner = this.labelsRegion.getScanner(s);
try {
List<Cell> results = new ArrayList<>(1);
while (true) {
scanner.next(results);
if (results.isEmpty()) break;
Cell cell = results.get(0);
int ordinal = PrivateCellUtil.getRowAsInt(cell);
String label = this.labelsCache.getLabel(ordinal);
if (label != null) {
auths.add(label);
}
results.clear();
}
} finally {
scanner.close();
}
return new ArrayList<>(auths);
}
private int getScannedCount(RegionScanner scanner) throws IOException {
int scannedCount = 0;
List<Cell> results = new ArrayList<>();
while (true) {
boolean existMore = scanner.next(results);
if (!results.isEmpty()) {
scannedCount++;
}
if (!existMore) {
break;
}
results.clear();
}
return scannedCount;
}
@Test
public void testCoprocessorInterface() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
byte [][] families = { fam1, fam2, fam3 };
Configuration hc = initConfig();
HRegion region = initHRegion(tableName, name.getMethodName(), hc,
new Class<?>[]{CoprocessorImpl.class}, families);
for (int i = 0; i < 3; i++) {
HTestConst.addContent(region, fam3);
region.flush(true);
}
region.compact(false);
// HBASE-4197
Scan s = new Scan();
RegionScanner scanner = region.getCoprocessorHost().postScannerOpen(s, region.getScanner(s));
assertTrue(scanner instanceof CustomScanner);
// this would throw an exception before HBASE-4197
scanner.next(new ArrayList<>());
HBaseTestingUtility.closeRegionAndWAL(region);
Coprocessor c = region.getCoprocessorHost().findCoprocessor(CoprocessorImpl.class);
assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasFlushed());
assertTrue(((CoprocessorImpl)c).wasCompacted());
}
@Test
public void testStoreFileMissing() throws Exception {
// Write 3 records and create 3 store files.
write("row1");
region.flush(true);
write("row2");
region.flush(true);
write("row3");
region.flush(true);
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<Cell>();
// Read first item
scanner.next(res);
assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
res.clear();
// Create a new file in between scan nexts
write("row4");
region.flush(true);
// Compact the table
region.compact(true);
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
cleaner.chore();
// This issues scan next
scanner.next(res);
assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
scanner.close();
}
private int getScannedCount(RegionScanner scanner) throws IOException {
int scannedCount = 0;
List<Cell> results = new ArrayList<>();
while (true) {
boolean existMore = scanner.next(results);
if (!results.isEmpty())
scannedCount++;
if (!existMore)
break;
results.clear();
}
return scannedCount;
}
private void runScanner(Table hTable, int expectedSize, Filter filter) throws IOException {
String cf = "f";
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(cf));
scan.setFilter(filter);
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(TableName.valueOf(table));
HRegion first = regions.get(0);
first.getScanner(scan);
RegionScanner scanner = first.getScanner(scan);
List<Cell> results = new ArrayList<>();
// Result result;
long timeBeforeScan = System.currentTimeMillis();
int found = 0;
while (scanner.next(results)) {
found += results.size();
results.clear();
}
found += results.size();
long scanTime = System.currentTimeMillis() - timeBeforeScan;
scanner.close();
LOG.info("\nscan time = " + scanTime + "ms");
LOG.info("found " + found + " results\n");
assertEquals(expectedSize, found);
}
@Test
public void testReverseScanWithoutPadding() throws Exception {
byte[] row1 = Bytes.toBytes("a");
byte[] row2 = Bytes.toBytes("ab");
byte[] row3 = Bytes.toBytes("b");
Put put1 = new Put(row1);
put1.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put2 = new Put(row2);
put2.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put3 = new Put(row3);
put3.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
region.put(put1);
region.put(put2);
region.put(put3);
region.flush(true);
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setReversed(true);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(cfName);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<>();
int count = 1;
while (scanner.next(res)) {
count++;
}
assertEquals("b", Bytes.toString(res.get(0).getRowArray(), res.get(0).getRowOffset(),
res.get(0).getRowLength()));
assertEquals("ab", Bytes.toString(res.get(1).getRowArray(), res.get(1).getRowOffset(),
res.get(1).getRowLength()));
assertEquals("a", Bytes.toString(res.get(2).getRowArray(), res.get(2).getRowOffset(),
res.get(2).getRowLength()));
assertEquals(3, count);
}
@Test
public void testReverseScanWithPadding() throws Exception {
byte[] terminator = new byte[] { -1 };
byte[] row1 = Bytes.add(invert(Bytes.toBytes("a")), terminator);
byte[] row2 = Bytes.add(invert(Bytes.toBytes("ab")), terminator);
byte[] row3 = Bytes.add(invert(Bytes.toBytes("b")), terminator);
Put put1 = new Put(row1);
put1.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put2 = new Put(row2);
put2.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put3 = new Put(row3);
put3.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
region.put(put1);
region.put(put2);
region.put(put3);
region.flush(true);
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setReversed(true);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(cfName);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<>();
int count = 1;
while (scanner.next(res)) {
count++;
}
assertEquals(3, count);
}
@Test
public void testNoMeasure() throws IOException {
CoprocessorRowType rowType = newRowType();
CoprocessorProjector projector = new CoprocessorProjector(mask);
ObserverAggregators aggregators = new ObserverAggregators(new HCol[] {});
CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
// always-true,
// filter
HashSet<String> expectedResult = new HashSet<String>();
expectedResult.add("\\x02\\x02\\x00\\x00");
expectedResult.add("\\x01\\x01\\x00\\x00");
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
result.clear();
hasMore = aggrScanner.next(result);
if (result.isEmpty())
continue;
Cell cell = result.get(0);
String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
assertTrue(expectedResult.contains(rowKey));
}
aggrScanner.close();
}
/**
* @param tableName parent table's name
* @return true if there exist a table that use this table as their base table.
* TODO: should we pass a timestamp here?
*/
private boolean hasViews(HRegion region, byte[] tenantId, PTable table) throws IOException {
byte[] schemaName = table.getSchemaName().getBytes();
byte[] tableName = table.getTableName().getBytes();
Scan scan = new Scan();
// If the table is multi-tenant, we need to check across all tenant_ids,
// so we can't constrain the row key. Otherwise, any views would have
// the same tenantId.
if (!table.isMultiTenant()) {
byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
byte[] stopRow = ByteUtil.nextKey(startRow);
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
}
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, BASE_SCHEMA_NAME_BYTES, EQUAL, schemaName);
filter1.setFilterIfMissing(schemaName.length > 0);
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, BASE_TABLE_NAME_BYTES, EQUAL, tableName);
filter2.setFilterIfMissing(true);
BinaryComparator comparator = new BinaryComparator(ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName, QueryConstants.SEPARATOR_BYTE_ARRAY, tableName));
RowFilter filter3 = new RowFilter(CompareOp.NOT_EQUAL,comparator);
Filter filter = new FilterList(filter1,filter2,filter3);
scan.setFilter(filter);
RegionScanner scanner = region.getScanner(scan);
try {
List<KeyValue> results = newArrayList();
scanner.next(results);
return results.size() > 0;
}
finally {
scanner.close();
}
}
/**
* Test from client side for scan with maxResultPerCF set
*/
@Test
public void testScanLimitAndOffset() throws Exception {
//byte [] TABLE = HTestConst.DEFAULT_TABLE_BYTES;
byte [][] ROWS = HTestConst.makeNAscii(HTestConst.DEFAULT_ROW_BYTES, 2);
byte [][] FAMILIES = HTestConst.makeNAscii(HTestConst.DEFAULT_CF_BYTES, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(HTestConst.DEFAULT_QUALIFIER_BYTES, 10);
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(
TableName.valueOf(HTestConst.DEFAULT_TABLE_BYTES));
RegionInfo info = RegionInfoBuilder.newBuilder(HTestConst.DEFAULT_TABLE).build();
for (byte[] family : FAMILIES) {
ColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family);
tableDescriptor.setColumnFamily(familyDescriptor);
}
HRegion region = HBaseTestingUtility.createRegionAndWAL(info, TEST_UTIL.getDataTestDir(),
TEST_UTIL.getConfiguration(), tableDescriptor);
try {
Put put;
Scan scan;
Result result;
boolean toLog = true;
List<Cell> kvListExp = new ArrayList<>();
int storeOffset = 1;
int storeLimit = 3;
for (int r = 0; r < ROWS.length; r++) {
put = new Put(ROWS[r]);
for (int c = 0; c < FAMILIES.length; c++) {
for (int q = 0; q < QUALIFIERS.length; q++) {
KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1,
HTestConst.DEFAULT_VALUE_BYTES);
put.add(kv);
if (storeOffset <= q && q < storeOffset + storeLimit) {
kvListExp.add(kv);
}
}
}
region.put(put);
}
scan = new Scan();
scan.setRowOffsetPerColumnFamily(storeOffset);
scan.setMaxResultsPerColumnFamily(storeLimit);
RegionScanner scanner = region.getScanner(scan);
List<Cell> kvListScan = new ArrayList<>();
List<Cell> results = new ArrayList<>();
while (scanner.next(results) || !results.isEmpty()) {
kvListScan.addAll(results);
results.clear();
}
result = Result.create(kvListScan);
TestScannersFromClientSide.verifyResult(result, kvListExp, toLog,
"Testing scan with storeOffset and storeLimit");
} finally {
HBaseTestingUtility.closeRegionAndWAL(region);
}
}
@Test
public void test() throws IOException {
CoprocessorRowType rowType = newRowType();
CoprocessorProjector projector = new CoprocessorProjector(mask);
ObserverAggregators aggregators = new ObserverAggregators(new HCol[] { c1, c2 });
CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
// always-true,
// filter
HashSet<String> expectedResult = new HashSet<String>();
expectedResult.add("\\x02\\x02\\x00\\x00, f:q1, [26.0, 7]");
expectedResult.add("\\x02\\x02\\x00\\x00, f:q2, [48.0]");
expectedResult.add("\\x01\\x01\\x00\\x00, f:q1, [22.0, 3]");
expectedResult.add("\\x01\\x01\\x00\\x00, f:q2, [44.0]");
MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner);
ArrayList<Cell> result = Lists.newArrayList();
boolean hasMore = true;
while (hasMore) {
result.clear();
hasMore = aggrScanner.next(result);
if (result.isEmpty())
continue;
Cell cell = result.get(0);
HCol hcol = null;
if (ObserverAggregators.match(c1, cell)) {
hcol = c1;
} else if (ObserverAggregators.match(c2, cell)) {
hcol = c2;
} else
fail();
hcol.measureCodec.decode(ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()), hcol.measureValues);
String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
String col = Bytes.toString(hcol.family) + ":" + Bytes.toString(hcol.qualifier);
String values = Arrays.toString(hcol.measureValues);
System.out.println(rowKey);
System.out.println(col);
System.out.println(values);
assertTrue(expectedResult.contains(rowKey + ", " + col + ", " + values));
}
aggrScanner.close();
}
/**
* Test presplitting an index table
*/
@Test
public void testSplitIndex() throws Exception {
if (localIndex) return; // can't split local indexes
if (!mutable) return;
if (transactional) return;
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
final TableName dataTN = TableName.valueOf(dataTableFullName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
TableName indexTN = TableName.valueOf(indexTableFullName);
try (Connection conn =
DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
String dataDDL =
"CREATE TABLE " + dataTableFullName + "(\n"
+ "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+ "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+ "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") COLUMN_ENCODED_BYTES = 0";
conn.createStatement().execute(dataDDL);
String[] carNumPrefixes = new String[] {"a", "b", "c", "d"};
// split the data table, as the tool splits the index table to have the same # of regions
// doesn't really matter what the split points are, we just want a target # of regions
int numSplits = carNumPrefixes.length;
int targetNumRegions = numSplits + 1;
byte[][] splitPoints = new byte[numSplits][];
for (String prefix : carNumPrefixes) {
splitPoints[--numSplits] = Bytes.toBytes(prefix);
}
HTableDescriptor dataTD = admin.getTableDescriptor(dataTN);
admin.disableTable(dataTN);
admin.deleteTable(dataTN);
admin.createTable(dataTD, splitPoints);
assertEquals(targetNumRegions, admin.getTableRegions(dataTN).size());
// insert data where index column values start with a, b, c, d
int idCounter = 1;
try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+ "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2016-01-01 00:00:00',11,'orgname1')")){
for (String carNum : carNumPrefixes) {
for (int i = 0; i < 100; i++) {
ps.setString(1, idCounter++ + "");
ps.setString(2, carNum + "_" + i);
ps.setString(3, "test-" + carNum + "_ " + i);
ps.addBatch();
}
}
ps.executeBatch();
conn.commit();
}
String indexDDL =
String.format(
"CREATE INDEX %s on %s (\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
indexTableName, dataTableFullName);
conn.createStatement().execute(indexDDL);
// run with 50% sampling rate, split if data table more than 3 regions
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,"-sp", "50", "-spa", "3");
assertEquals(targetNumRegions, admin.getTableRegions(indexTN).size());
List<Cell> values = new ArrayList<>();
// every index region should have been written to, if the index table was properly split uniformly
for (HRegion region : getUtility().getHBaseCluster().getRegions(indexTN)) {
values.clear();
RegionScanner scanner = region.getScanner(new Scan());
scanner.next(values);
if (values.isEmpty()) {
fail("Region did not have any results: " + region.getRegionInfo());
}
}
}
}