下面列出了怎么用org.apache.hadoop.hbase.filter.InclusiveStopFilter的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testFilterAllRecords() throws IOException {
Scan scan = new Scan();
scan.setBatch(1);
scan.setCaching(1);
// Filter out any records
scan.setFilter(new FilterList(new FirstKeyOnlyFilter(), new InclusiveStopFilter(new byte[0])));
try (Table table = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
try (ResultScanner s = table.getScanner(scan)) {
assertNull(s.next());
}
}
}
@Test
public void testInclusiveStopFilter() throws Exception {
// Grab rows from group one
// If we just use start/stop row, we get total/2 - 1 rows
long expectedRows = (numRows / 2) - 1;
long expectedKeys = colsPerRow;
Scan s = new Scan().withStartRow(Bytes.toBytes("testRowOne-0"))
.withStopRow(Bytes.toBytes("testRowOne-3"));
verifyScan(s, expectedRows, expectedKeys);
// Now use start row with inclusive stop filter
expectedRows = numRows / 2;
s = new Scan().withStartRow(Bytes.toBytes("testRowOne-0"));
s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowOne-3")));
verifyScan(s, expectedRows, expectedKeys);
// Grab rows from group two
// If we just use start/stop row, we get total/2 - 1 rows
expectedRows = (numRows / 2) - 1;
expectedKeys = colsPerRow;
s = new Scan().withStartRow(Bytes.toBytes("testRowTwo-0"))
.withStopRow(Bytes.toBytes("testRowTwo-3"));
verifyScan(s, expectedRows, expectedKeys);
// Now use start row with inclusive stop filter
expectedRows = numRows / 2;
s = new Scan().withStartRow(Bytes.toBytes("testRowTwo-0"));
s.setFilter(new InclusiveStopFilter(Bytes.toBytes("testRowTwo-3")));
verifyScan(s, expectedRows, expectedKeys);
}
@Override
public void queryByStartRowAndEndRow(RpcController controller, DataProtos.DataQueryRequest request, RpcCallback<DataQueryResponse> done) {
DataProtos.DataQueryResponse response = null;
InternalScanner scanner = null;
try {
String startRow = request.getStartRow();
String endRow = request.getEndRow();
String regionStartKey = Bytes.toString(this.env.getRegion().getRegionInfo().getStartKey());
String regionEndKey = Bytes.toString(this.env.getRegion().getRegionInfo().getEndKey());
if (request.getIsSalting()) { // 如果加盐过则在key前添加盐值
String startSalt = null;
String endSalt = null;
if (StrUtils.isNotEmpty(regionStartKey)) {
startSalt = regionStartKey.split("_")[0]; // 加盐的方式为盐值+"_",所以取_前面的
}
if (StrUtils.isNotEmpty(regionEndKey)) {
endSalt = regionStartKey.split("_")[0]; //加盐的方式为盐值+"_",所以取_前面的
}
if (startSalt != null) {
if (null != startRow) {
startRow = startSalt + "_" + startRow;
endRow = endSalt + "_" + endRow;
}
}
}
Scan scan = new Scan();
if (null != startRow) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if (null != endRow) {
if (request.getIncluedEnd()) {
Filter filter = new InclusiveStopFilter(Bytes.toBytes(endRow));
scan.setFilter(filter);
} else {
scan.setStopRow(Bytes.toBytes(endRow));
}
}
scanner = this.env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore;
DataProtos.DataQueryResponse.Builder responseBuilder = DataProtos.DataQueryResponse.newBuilder();
do {
hasMore = scanner.next(results);
DataProtos.DataQueryResponse.Row.Builder rowBuilder = DataProtos.DataQueryResponse.Row.newBuilder();
if (results.size() > 0) {
rowBuilder.setRowKey(ByteString.copyFrom(results.get(0).getRow()));
for (Cell kv : results) {
queryBuilder(rowBuilder, ByteString.copyFrom(kv.getFamily()), ByteString.copyFrom(kv.getQualifier()), ByteString.copyFrom(kv.getRow()), ByteString.copyFrom(kv.getValue()));
}
}
responseBuilder.addRowList(rowBuilder);
results.clear();
} while (hasMore);
response = responseBuilder.build();
} catch (IOException ignored) {
ResponseConverter.setControllerException(controller, ignored);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
_log.error(ExceptionUtils.errorInfo(e));
}
}
}
done.run(response);
}
@Test
public void testCompositeRowIndexPredication() throws Exception {
executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " +
"TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
"'hbase.split.rowkeys'='010,040,060,080', " +
"'hbase.rowkey.delimiter'='_')").close();
assertTableExists("hbase_mapped_table");
HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
hAdmin.tableExists("hbase_table");
HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
try {
org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
assertEquals(5, keys.getFirst().length);
DecimalFormat df = new DecimalFormat("000");
for (int i = 0; i < 100; i++) {
Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes());
put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
htable.put(put);
}
Scan scan = new Scan();
scan.setStartRow("021".getBytes());
scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes());
Filter filter = new InclusiveStopFilter(scan.getStopRow());
scan.setFilter(filter);
ResultScanner scanner = htable.getScanner(scan);
Result result = scanner.next();
assertNotNull(result);
assertEquals("021_021", new String(result.getRow()));
scanner.close();
assertIndexPredication(true);
ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'");
String expected = "rk,rk2,col1,col2,col3\n" +
"-------------------------------\n" +
"021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
assertEquals(expected, resultSetToString(res));
res.close();
} finally {
executeString("DROP TABLE hbase_mapped_table PURGE").close();
htable.close();
hAdmin.close();
}
}