下面列出了怎么用org.apache.hadoop.hbase.filter.CompareFilter的API类实例代码及写法,或者点击链接到github查看源代码。
private FilterList getColumnValueFilters(Row row) {
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
Set<String> filterColumnNames = Sets.newHashSet(row.schema().fieldNames());
for (Map.Entry<String, ColumnDef> column : columns.entrySet()) {
if (!column.getValue().cf.equals("rowkey")) {
if (filterColumnNames.contains(column.getKey())) {
byte[] value = getColumnValueAsBytes(column.getValue().name, column.getValue().type, row);
if (value != null) {
SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes(column.getValue().cf),
Bytes.toBytes(column.getValue().name),
CompareFilter.CompareOp.EQUAL,
value
);
filterList.addFilter(columnValueFilter);
}
}
}
}
return filterList;
}
public static void filterLimitValueRange(String projectId, String instanceId, String tableId) {
// A filter that matches cells whose values are between the given values
ValueFilter valueGreaterFilter =
new ValueFilter(
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("PQ2A.190405")));
ValueFilter valueLesserFilter =
new ValueFilter(
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("PQ2A.190406")));
FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filter.addFilter(valueGreaterFilter);
filter.addFilter(valueLesserFilter);
Scan scan = new Scan().setFilter(filter);
readWithFilter(projectId, instanceId, tableId, scan);
}
/**
* Attempts to push down at basic Filter predicate into HBase.
*
* @param isNative True if the values are stored in HBase using native byte[] vs being serialized as Strings.
* @param constraints The constraints that we can attempt to push into HBase as part of the scan.
* @return A filter if we found a predicate we can push down, null otherwise/
* @note Currently this method only supports constraints that can be represented by HBase's SingleColumnValueFilter
* and CompareOp of EQUAL. In the future we can add > and < for certain field types.
*/
private Filter pushdownPredicate(boolean isNative, Constraints constraints)
{
for (Map.Entry<String, ValueSet> next : constraints.getSummary().entrySet()) {
if (next.getValue().isSingleValue() && !next.getValue().isNullAllowed()) {
String[] colParts = HbaseSchemaUtils.extractColumnParts(next.getKey());
return new SingleColumnValueFilter(colParts[0].getBytes(),
colParts[1].getBytes(),
CompareFilter.CompareOp.EQUAL,
HbaseSchemaUtils.toBytes(isNative, next.getValue().getSingleValue()));
}
}
return null;
}
private FilterList generateTimeFilterList(VisitFilter visitFilter) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (visitFilter.lastModStart >= 0) { // NOTE: Negative value does not work in its binary form
SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(visitFilter.lastModStart));
filterList.addFilter(timeStartFilter);
}
if (visitFilter.lastModEndExclusive != Long.MAX_VALUE) {
SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
CompareFilter.CompareOp.LESS, Bytes.toBytes(visitFilter.lastModEndExclusive));
filterList.addFilter(timeEndFilter);
}
return filterList.getFilters().isEmpty() ? null : filterList;
}
/**
* Handles IS NULL and IS NOT NULL operators
*
* @param hBaseColumn the HBase column
* @param operator the IS NULL/IS NOT NULL operator
* @return the filter for the given operator
*/
private Filter processNullOperator(HBaseColumnDescriptor hBaseColumn, Operator operator) {
CompareFilter.CompareOp compareOperation = (operator == Operator.IS_NULL) ?
CompareFilter.CompareOp.EQUAL :
CompareFilter.CompareOp.NOT_EQUAL;
return new SingleColumnValueFilter(
hBaseColumn.columnFamilyBytes(),
hBaseColumn.qualifierBytes(),
compareOperation,
new NullComparator());
}
@Test
public void parseIsNullExpression() throws Exception {
Filter filter = helper("a1o8", tupleDescription);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter result = (SingleColumnValueFilter) filter;
assertNotNull(result);
assertSame(families[1], result.getFamily());
assertSame(qualifiers[1], result.getQualifier());
assertEquals(CompareFilter.CompareOp.EQUAL, result.getOperator());
assertTrue(result.getComparator() instanceof NullComparator);
}
@Test
public void parseIsNotNullExpression() throws Exception {
Filter filter = helper("a1o9", tupleDescription);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter result = (SingleColumnValueFilter) filter;
assertNotNull(result);
assertSame(families[1], result.getFamily());
assertSame(qualifiers[1], result.getQualifier());
assertEquals(CompareFilter.CompareOp.NOT_EQUAL, result.getOperator());
assertTrue(result.getComparator() instanceof NullComparator);
}
@Test
public void testSimpleColumnOperator() throws Exception {
// id > 5
Filter filter = helper("a0c20s1d5o2", tupleDescription);
assertNotNull(filter);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter scvFilter = (SingleColumnValueFilter) filter;
assertSame(families[0], scvFilter.getFamily());
assertSame(qualifiers[0], scvFilter.getQualifier());
assertEquals(CompareFilter.CompareOp.GREATER, scvFilter.getOperator());
assertTrue(scvFilter.getComparator() instanceof HBaseIntegerComparator);
assertEquals(0, scvFilter.getComparator().compareTo("5".getBytes()));
}
@Test
public void testOrOperator() throws Exception {
// a1 > '2008-02-01' or a2 > 1200
Filter filter = helper("a1c25s10d2008-02-01o2a2c20s4d1200o2l1", tupleDescription);
assertNotNull(filter);
assertTrue(filter instanceof FilterList);
FilterList filterList = (FilterList) filter;
assertEquals(FilterList.Operator.MUST_PASS_ONE, filterList.getOperator());
assertNotNull(filterList.getFilters());
assertEquals(2, filterList.getFilters().size());
Filter left = filterList.getFilters().get(0);
Filter right = filterList.getFilters().get(1);
assertTrue(left instanceof SingleColumnValueFilter);
assertTrue(right instanceof SingleColumnValueFilter);
SingleColumnValueFilter scvFilterLeft = (SingleColumnValueFilter) left;
SingleColumnValueFilter scvFilterRight = (SingleColumnValueFilter) right;
assertEquals(families[1], scvFilterLeft.getFamily());
assertEquals(qualifiers[1], scvFilterLeft.getQualifier());
assertEquals(CompareFilter.CompareOp.GREATER, scvFilterLeft.getOperator());
assertEquals(0, scvFilterLeft.getComparator().compareTo("2008-02-01".getBytes()));
assertEquals(families[2], scvFilterRight.getFamily());
assertEquals(qualifiers[2], scvFilterRight.getQualifier());
assertEquals(CompareFilter.CompareOp.GREATER, scvFilterRight.getOperator());
assertEquals(0, scvFilterRight.getComparator().compareTo("1200".getBytes()));
}
@Test
public void testFilterWithIncompatiblePredicate() throws Exception {
// ((_1_ LIKE row1 AND _2_ < 999) AND _1_ = seq)
Filter filter = helper("a1c25s4drow1o7a2c23s3d999o1l0a1c25s3dseqo5l0", tupleDescription);
assertNotNull(filter);
assertTrue(filter instanceof FilterList);
FilterList filterList = (FilterList) filter;
assertEquals(FilterList.Operator.MUST_PASS_ALL, filterList.getOperator());
// LIKE is not supported so it gets dropped
assertNotNull(filterList.getFilters());
assertEquals(2, filterList.getFilters().size());
Filter left = filterList.getFilters().get(0);
Filter right = filterList.getFilters().get(1);
assertTrue(left instanceof SingleColumnValueFilter);
assertTrue(right instanceof SingleColumnValueFilter);
SingleColumnValueFilter scvFilterLeft = (SingleColumnValueFilter) left;
SingleColumnValueFilter scvFilterRight = (SingleColumnValueFilter) right;
assertEquals(families[2], scvFilterLeft.getFamily());
assertEquals(qualifiers[2], scvFilterLeft.getQualifier());
assertEquals(CompareFilter.CompareOp.LESS, scvFilterLeft.getOperator());
assertEquals(0, scvFilterLeft.getComparator().compareTo("999".getBytes()));
assertEquals(families[1], scvFilterRight.getFamily());
assertEquals(qualifiers[1], scvFilterRight.getQualifier());
assertEquals(CompareFilter.CompareOp.EQUAL, scvFilterRight.getOperator());
assertEquals(0, scvFilterRight.getComparator().compareTo("seq".getBytes()));
}
@Test(timeOut = 10_000)
public void testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) throws Exception {
// TX History for PMP for Read Predicate:
// begin; set transaction isolation level repeatable read; -- T1
// begin; set transaction isolation level repeatable read; -- T2
// select * from test where value = 30; -- T1. Returns nothing
// insert into test (id, value) values(3, 30); -- T2
// commit; -- T2
// select * from test where value % 3 = 0; -- T1. Still returns nothing
// commit; -- T1
// 0) Start transactions
TransactionManager tm = newTransactionManager(context);
TTable txTable = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Transaction tx2 = tm.begin();
// 1) select * from test where value = 30; -- T1. Returns nothing
Scan scan = new Scan();
Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
scan.setFilter(f);
ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
assertNull(tx1Scanner.next());
// 2) insert into test (id, value) values(3, 30); -- T2
Put newRow = new Put(rowId3);
newRow.addColumn(famName, colName, dataValue3);
txTable.put(tx2, newRow);
// 3) Commit TX 2
tm.commit(tx2);
// 4) select * from test where value % 3 = 0; -- T1. Still returns nothing
tx1Scanner = txTable.getScanner(tx1, scan);
assertNull(tx1Scanner.next());
// 5) Commit TX 1
tm.commit(tx1);
}
@Test(timeOut = 60_000)
public void testServerSideSnapshotFiltering() throws Throwable {
byte[] rowName1 = Bytes.toBytes("row1");
byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
byte[] colName1 = Bytes.toBytes("col1");
byte[] dataValue1 = Bytes.toBytes("testWrite-1");
byte[] dataValue2 = Bytes.toBytes("testWrite-2");
String TEST_TABLE = "testServerSideSnapshotFiltering";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue2);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
Get get = new Get(rowName1);
// If snapshot filtering is not done in the server then the first value is
// "testWrite-2" and the whole row will be filtered out.
SingleColumnValueFilter filter = new SingleColumnValueFilter(
famName1,
colName1,
CompareFilter.CompareOp.EQUAL,
new SubstringComparator("testWrite-1"));
get.setFilter(filter);
Result results = tt.get(tx3, get);
assertTrue(results.size() == 1);
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2,
CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2,
CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException(
"checkAndMutate operation is not supported transactionally");
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
}
@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Delete delete) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp,
byte[] bytes3, Put put) throws IOException {
if (allowNonTransactional) {
return hTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
if (allowNonTransactional) {
return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations);
}
throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally");
}