下面列出了怎么用org.apache.hadoop.hbase.filter.FuzzyRowFilter的API类实例代码及写法,或者点击链接到github查看源代码。
public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) {
if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
Filter filter = scan.getFilter();
if (filter != null) {
// may have existed InclusiveStopFilter, see buildScan
FilterList filterList = new FilterList();
filterList.addFilter(filter);
filterList.addFilter(rowFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(rowFilter);
}
}
}
public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) {
if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
Filter filter = scan.getFilter();
if (filter != null) {
// may have existed InclusiveStopFilter, see buildScan
FilterList filterList = new FilterList();
filterList.addFilter(filter);
filterList.addFilter(rowFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(rowFilter);
}
}
}
private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {
List<Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys();
if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
FuzzyRowFilter rowFilter = new FuzzyRowFilter(fuzzyKeys);
Filter filter = scan.getFilter();
if (filter != null) {
// may have existed InclusiveStopFilter, see buildScan
FilterList filterList = new FilterList();
filterList.addFilter(filter);
filterList.addFilter(rowFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(rowFilter);
}
}
}
Scan GenerateScanFuzzy(long starttime, long endtime, String cluster,
String path) throws IOException {
Scan scan = createScanWithAllColumns();
String rowKeySuffix = HdfsConstants.SEP + cluster + HdfsConstants.SEP
+ StringUtil.cleanseToken(path);
String rowKey = HdfsConstants.INVERTED_TIMESTAMP_FUZZY_INFO + rowKeySuffix;
int fuzzyLength =
HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP + rowKeySuffix.length();
byte[] fuzzyInfo = new byte[fuzzyLength];
for (int i = 0; i < HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP; i++) {
fuzzyInfo[i] = 1;
}
for (int i =
HdfsConstants.NUM_CHARS_INVERTED_TIMESTAMP; i < fuzzyLength; i++) {
fuzzyInfo[i] = 0;
}
@SuppressWarnings("unchecked")
FuzzyRowFilter rowFilter = new FuzzyRowFilter(Arrays.asList(
new Pair<byte[], byte[]>(Bytes.toBytesBinary(rowKey), fuzzyInfo)));
scan.setFilter(rowFilter);
String minStartKey = Long.toString(getEncodedRunId(starttime));
String maxEndKey = Long.toString(getEncodedRunId(endtime));
LOG.info(
starttime + " " + getEncodedRunId(starttime) + " min " + minStartKey
+ " " + endtime + " " + maxEndKey + " " + getEncodedRunId(endtime));
scan.setStartRow(Bytes.toBytes(minStartKey + rowKeySuffix));
scan.setStopRow(Bytes.toBytes(maxEndKey + rowKeySuffix));
LOG.info(" scan: " + scan.toJSON());
return scan;
}
private Filter createFilter(QueryCondition.RowKey rowKey, byte[] startRowKey) {
String businessId = rowKey.getBusinessId();
if (StringUtils.isNotEmpty(businessId)) {
List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> fuzzyKeysData = new LinkedList<>();
org.apache.hadoop.hbase.util.Pair<byte[], byte[]> pair = new org.apache.hadoop.hbase.util.Pair<>();
// 时间任意
for (int i = 4; i < 12; i++) {
startRowKey[i] = Bytes.toBytes("?")[0];
}
// messageId任意
for (int i = 28; i < 44; i++) {
startRowKey[i] = Bytes.toBytes("?")[0];
}
pair.setFirst(startRowKey);
byte fixed = 0x0; //必须匹配
byte unFixed = 0x1; //不用匹配
ByteBuffer allocate = ByteBuffer.allocate(44);
for (int i = 0; i < 4; i++) {
allocate.put(fixed);
}
for (int i = 0; i < 8; i++) {
allocate.put(unFixed);
}
for (int i = 0; i < 16; i++) {
allocate.put(fixed);
}
for (int i = 0; i < 16; i++) {
allocate.put(unFixed);
}
pair.setSecond(allocate.array());
fuzzyKeysData.add(pair);
Filter filter = new FuzzyRowFilter(fuzzyKeysData);
return filter;
}
return null;
}