下面列出了怎么用org.apache.hadoop.hbase.filter.SingleColumnValueFilter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 只需要pageview事件
filterList.addFilter(
new SingleColumnValueFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL,
Bytes.toBytes(EventLogConstants.EventEnum.PAGEVIEW.alias)));
// 定义mapper中需要获取的列名
String[] columns = new String[]{EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, // 获取事件名称
EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL, // 当前url
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION // 浏览器版本号
};
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[]{EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID, // 会员id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本号
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME
// 添加一个事件名称获取列,在使用singlecolumnvaluefilter的时候必须指定对应的列是一个返回列
};
filterList.addFilter(this.getColumnFilter(columns));
// 只需要page view事件,所以进行过滤
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL,
Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList list = new FilterList();
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间戳
EventLogConstants.LOG_COLUMN_NAME_UUID, // 用户id
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, // 会话id
EventLogConstants.LOG_COLUMN_NAME_COUNTRY, // 国家
EventLogConstants.LOG_COLUMN_NAME_PROVINCE, // 省份
EventLogConstants.LOG_COLUMN_NAME_CITY, // 城市
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, // 事件名称
};
list.addFilter(this.getColumnFilter(columns));
// 过滤只需要pageview事件
list.addFilter(new SingleColumnValueFilter(LocationMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return list;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 只需要pageview事件
filterList.addFilter(new SingleColumnValueFilter(PageViewMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventLogConstants.EventEnum.PAGEVIEW.alias)));
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, // 获取事件名称
EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL, // 当前url
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION // 浏览器版本号
};
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID, // 会员id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本号
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 添加一个事件名称获取列,在使用singlecolumnvaluefilter的时候必须指定对应的列是一个返回列
};
filterList.addFilter(this.getColumnFilter(columns));
// 只需要page view事件,所以进行过滤
filterList.addFilter(new SingleColumnValueFilter(ActiveMemberMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return filterList;
}
/**
* 查询某个用户所有的主叫电话(type=1)
* 某个用户
* type=1
*
*/
@Test
public void getType() throws IOException {
Scan scan = new Scan();
//创建过滤器集合
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
//创建过滤器
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("type"),CompareOperator.EQUAL,Bytes.toBytes("1"));
filters.addFilter(filter1);
//前缀过滤器
PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("15883348450"));
filters.addFilter(filter2);
scan.setFilter(filters);
ResultScanner rss = table.getScanner(scan);
for (Result rs:rss) {
System.out.print(Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("dnum")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("type")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("length")))));
System.out.println("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("date")))));
}
}
@Test
public void testScan() throws IOException {
Connection connection = admin.getConnection();
Table table = connection.getTable(TableName.valueOf("tbl_girls"));
Scan scan = new Scan(Bytes.toBytes("0001"), Bytes.toBytes("0004"));
// RowKeyFilter
Filter filter = new PrefixFilter(Bytes.toBytes("000"));
scan.setFilter(filter);
Filter filter2 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("000"));
scan.setFilter(filter2);
//BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(29));
Filter filter3 = new SingleColumnValueFilter(Bytes.toBytes("base_info"), Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes(29));
scan.setFilter(filter3);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
LOGGER.info(result.toString());
int value = Bytes.toInt(result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age")));
LOGGER.info(String.valueOf(value));
}
}
private List<RegionInfo> getOpenRegions(Connection connection, TableName table) throws Exception {
List<RegionInfo> regions = new ArrayList<>();
Table metaTbl = connection.getTable(META_TABLE_NAME);
String tblName = table.getNameAsString();
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL,
new SubstringComparator(tblName+","));
SingleColumnValueFilter colFilter = new SingleColumnValueFilter(CATALOG_FAMILY,
STATE_QUALIFIER, CompareOperator.EQUAL, Bytes.toBytes("OPEN"));
Scan scan = new Scan();
FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filter.addFilter(rowFilter);
filter.addFilter(colFilter);
scan.setFilter(filter);
try(ResultScanner rs = metaTbl.getScanner(scan)){
Result r;
while ((r = rs.next()) != null) {
RegionInfo region = RegionInfo.parseFrom(r.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER));
regions.add(region);
}
}
return regions;
}
private FilterList getColumnValueFilters(Row row) {
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
Set<String> filterColumnNames = Sets.newHashSet(row.schema().fieldNames());
for (Map.Entry<String, ColumnDef> column : columns.entrySet()) {
if (!column.getValue().cf.equals("rowkey")) {
if (filterColumnNames.contains(column.getKey())) {
byte[] value = getColumnValueAsBytes(column.getValue().name, column.getValue().type, row);
if (value != null) {
SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes(column.getValue().cf),
Bytes.toBytes(column.getValue().name),
CompareFilter.CompareOp.EQUAL,
value
);
filterList.addFilter(columnValueFilter);
}
}
}
}
return filterList;
}
/**
* <h2>History</h2>
* <ul>
* <li><b>Nov 19th, 2014</b>: Fix for out put all qualifiers</li>
* </ul>
* @param s1
* @param filter
*/
protected void workaroundHBASE2198(Scan s1, Filter filter) {
if (filter instanceof SingleColumnValueFilter) {
if(this.qualifiers == null){
s1.addFamily(((SingleColumnValueFilter) filter).getFamily());
}else {
s1.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
}
return;
}
if (filter instanceof FilterList) {
for (Filter f : ((FilterList)filter).getFilters()) {
workaroundHBASE2198(s1, f);
}
}
}
/**
* <h2>History</h2>.
* <ul>
* <li><b>Nov 19th, 2014</b>: Fix for out put all qualifiers</li>
* </ul>
*
* @param s1
* @param filter
*/
protected void workaroundHBASE2198(Scan s1, Filter filter) {
if (filter instanceof SingleColumnValueFilter) {
if (this.qualifiers == null) {
s1.addFamily(((SingleColumnValueFilter)filter).getFamily());
} else {
s1.addColumn(((SingleColumnValueFilter)filter).getFamily(),
((SingleColumnValueFilter)filter).getQualifier());
}
return;
}
if (filter instanceof FilterList) {
for (Filter f : ((FilterList)filter).getFilters()) {
workaroundHBASE2198(s1, f);
}
}
}
protected static void workaroundHBASE2198(Get get, Filter filter, byte[][] qualifiers) {
if (filter instanceof SingleColumnValueFilter) {
if (qualifiers == null) {
get.addFamily(((SingleColumnValueFilter)filter).getFamily());
} else {
get.addColumn(((SingleColumnValueFilter)filter).getFamily(),
((SingleColumnValueFilter)filter).getQualifier());
}
return;
}
if (filter instanceof FilterList) {
for (Filter f : ((FilterList)filter).getFilters()) {
workaroundHBASE2198(get, f, qualifiers);
}
}
}
protected ResultScanner buildScanner(String keyPrefix, String value, Table ht)
throws IOException {
// OurFilterList allFilters = new OurFilterList();
FilterList allFilters = new FilterList(/* FilterList.Operator.MUST_PASS_ALL */);
allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
.toBytes("trans-tags"), Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes
.toBytes(value));
filter.setFilterIfMissing(true);
allFilters.addFilter(filter);
// allFilters.addFilter(new
// RowExcludingSingleColumnValueFilter(Bytes.toBytes("trans-tags"),
// Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)));
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("trans-blob"));
scan.addFamily(Bytes.toBytes("trans-type"));
scan.addFamily(Bytes.toBytes("trans-date"));
scan.addFamily(Bytes.toBytes("trans-tags"));
scan.addFamily(Bytes.toBytes("trans-group"));
scan.setFilter(allFilters);
return ht.getScanner(scan);
}
private void runScanner(Table table, boolean slow) throws Exception {
long time = System.nanoTime();
Scan scan = new Scan();
scan.addColumn(cf_essential, col_name);
scan.addColumn(cf_joined, col_name);
SingleColumnValueFilter filter = new SingleColumnValueFilter(
cf_essential, col_name, CompareOperator.EQUAL, flag_yes);
filter.setFilterIfMissing(true);
scan.setFilter(filter);
scan.setLoadColumnFamiliesOnDemand(!slow);
ResultScanner result_scanner = table.getScanner(scan);
Result res;
long rows_count = 0;
while ((res = result_scanner.next()) != null) {
rows_count++;
}
double timeSec = (System.nanoTime() - time) / 1000000000.0;
result_scanner.close();
LOG.info((slow ? "Slow" : "Joined") + " scanner finished in " + Double.toString(timeSec)
+ " seconds, got " + Long.toString(rows_count/2) + " rows");
}
protected Scan constructScan(byte[] valuePrefix) throws IOException {
FilterList list = new FilterList();
Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
list.addFilter(filter);
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
}
Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
.setScanMetricsEnabled(true);
if (opts.addColumns) {
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
scan.addColumn(FAMILY_ZERO, qualifier);
}
} else {
scan.addFamily(FAMILY_ZERO);
}
scan.setFilter(list);
return scan;
}
@Override
public List<String> getSearchClicksRowKeysWithValidQueryString() {
LOG.debug("Checking getSearchClicksRowKeys searchclicks table content!");
Scan scan = new Scan();
scan.addFamily(HbaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES);
SingleColumnValueFilter filter = new SingleColumnValueFilter(HbaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES,
Bytes.toBytes("querystring"), CompareOp.NOT_EQUAL, Bytes.toBytes("jaiblahblah"));
filter.setFilterIfMissing(true);
scan.setFilter(filter);
List<String> rows = hbaseTemplate.find("searchclicks", scan,
new RowMapper<String>() {
@Override
public String mapRow(Result result, int rowNum)
throws Exception {
return new String(result.getRow());
}
});
for (String row : rows) {
LOG.debug("searchclicks table content, Table returned row key: {}", row);
}
LOG.debug("Checking getSearchClicksRowKeys searchclicks table content done!");
return rows;
}
/**
* Check metadata to find all child views for a given table/view
* @param sysCatOrsysChildLink For older (pre-4.15.0) clients, we look for child links inside SYSTEM.CATALOG,
* otherwise we look for them inside SYSTEM.CHILD_LINK
* @param tenantId tenantId
* @param schemaName table schema name
* @param tableName table name
* @param timestamp passed client-side timestamp
* @return true if the given table has at least one child view
* @throws IOException
*/
public static boolean hasChildViews(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schemaName,
byte[] tableName, long timestamp) throws IOException {
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
SingleColumnValueFilter linkFilter =
new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES,
CompareFilter.CompareOp.EQUAL,
LinkType.CHILD_TABLE.getSerializedValueAsByteArray()) {
// if we found a row with the CHILD_TABLE link type we are done and can
// terminate the scan
@Override
public boolean filterAllRemaining() {
return matchedColumn;
}
};
linkFilter.setFilterIfMissing(true);
scan.setFilter(linkFilter);
scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
try (ResultScanner scanner = sysCatOrsysChildLink.getScanner(scan)) {
Result result = scanner.next();
return result!=null;
}
}
/**
* creates a scan for flow data
* @param rowPrefix - start row prefix
* @param limit - limit on scanned results
* @param version - version to match
* @return Scan
*/
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
Scan scan = new Scan();
scan.setStartRow(rowPrefix);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(prefixFilter);
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
scan.setFilter(filters);
} else {
scan.setFilter(prefixFilter);
}
return scan;
}
public SingleFieldEntityFilter(EntitySchema entitySchema,
EntitySerDe<?> entitySerDe, String fieldName, Object filterValue,
CompareFilter.CompareOp equalityOperator) {
FieldMapping fieldMapping = entitySchema.getColumnMappingDescriptor()
.getFieldMapping(fieldName);
if (fieldMapping.getMappingType() != MappingType.COLUMN) {
throw new DatasetException(
"SingleColumnValueFilter only compatible with COLUMN mapping types.");
}
byte[] family = fieldMapping.getFamily();
byte[] qualifier = fieldMapping.getQualifier();
byte[] comparisonBytes = entitySerDe.serializeColumnValueToBytes(fieldName,
filterValue);
this.filter = new SingleColumnValueFilter(family, qualifier,
equalityOperator, comparisonBytes);
}
/**
* Attempts to push down at basic Filter predicate into HBase.
*
* @param isNative True if the values are stored in HBase using native byte[] vs being serialized as Strings.
* @param constraints The constraints that we can attempt to push into HBase as part of the scan.
* @return A filter if we found a predicate we can push down, null otherwise/
* @note Currently this method only supports constraints that can be represented by HBase's SingleColumnValueFilter
* and CompareOp of EQUAL. In the future we can add > and < for certain field types.
*/
private Filter pushdownPredicate(boolean isNative, Constraints constraints)
{
for (Map.Entry<String, ValueSet> next : constraints.getSummary().entrySet()) {
if (next.getValue().isSingleValue() && !next.getValue().isNullAllowed()) {
String[] colParts = HbaseSchemaUtils.extractColumnParts(next.getKey());
return new SingleColumnValueFilter(colParts[0].getBytes(),
colParts[1].getBytes(),
CompareFilter.CompareOp.EQUAL,
HbaseSchemaUtils.toBytes(isNative, next.getValue().getSingleValue()));
}
}
return null;
}
/**
* 初始化scan集合
*
* @param job
* @return
*/
private List<Scan> initScans(Job job) {
// 时间戳+....
Configuration conf = job.getConfiguration();
// 获取运行时间: yyyy-MM-dd
String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
long startDate = TimeUtil.parseString2Long(date);
long endDate = startDate + GlobalConstants.DAY_OF_MILLISECONDS;
Scan scan = new Scan();
// 定义hbase扫描的开始rowkey和结束rowkey
scan.setStartRow(Bytes.toBytes("" + startDate));
scan.setStopRow(Bytes.toBytes("" + endDate));
FilterList filterList = new FilterList();
// 过滤数据,只分析launch事件
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME), Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.LAUNCH.alias)));
// 定义mapper中需要获取的列名
String[] columns = new String[] {
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME,
EventLogConstants.LOG_COLUMN_NAME_UUID,
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME,
EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME,
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION };
// scan.addColumn(family, qualifier)
filterList.addFilter(this.getColumnFilter(columns));
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(EventLogConstants.HBASE_NAME_EVENT_LOGS));
scan.setFilter(filterList);
return Lists.newArrayList(scan);
}
@Override
protected Filter fetchHbaseFilter() {
FilterList list = new FilterList();
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_REFERRER_URL, // 前一个页面的url
EventLogConstants.LOG_COLUMN_NAME_UUID, // uuid
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, // 会话id
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 事件名称
};
list.addFilter(this.getColumnFilter(columns));
list.addFilter(new SingleColumnValueFilter(InboundMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return list;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList list = new FilterList();
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_REFERRER_URL, // 前一个页面的url
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, // 会话id
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 事件名称
};
list.addFilter(this.getColumnFilter(columns));
list.addFilter(new SingleColumnValueFilter(InboundMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return list;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 过滤数据,只分析launch事件
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME), Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.LAUNCH.alias)));
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, EventLogConstants.LOG_COLUMN_NAME_UUID, EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, EventLogConstants.LOG_COLUMN_NAME_PLATFORM, EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION };
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
private FilterList generateTimeFilterList(VisitFilter visitFilter) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (visitFilter.lastModStart >= 0) { // NOTE: Negative value does not work in its binary form
SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(visitFilter.lastModStart));
filterList.addFilter(timeStartFilter);
}
if (visitFilter.lastModEndExclusive != Long.MAX_VALUE) {
SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS,
CompareFilter.CompareOp.LESS, Bytes.toBytes(visitFilter.lastModEndExclusive));
filterList.addFilter(timeEndFilter);
}
return filterList.getFilters().isEmpty() ? null : filterList;
}
/**
* Handles simple column-operator-constant expressions.
* Creates a special filter in the case the column is the row key column.
*
* @param hBaseColumn the HBase column
* @param operator the simple column operator
* @param data the optional operand
* @return the {@link Filter} for the given simple column operator
*/
private Filter processSimpleColumnOperator(HBaseColumnDescriptor hBaseColumn,
Operator operator,
OperandNode data) {
// The value of lastOperand has to be stored after visiting
// the operand child of this node.
ByteArrayComparable comparator = getComparator(
hBaseColumn.columnTypeCode(),
data);
/*
* If row key is of type TEXT, allow filter in start/stop row
* key API in HBaseAccessor/Scan object.
*/
if (data != null && isTextualRowKey(hBaseColumn)) {
storeStartEndKeys(operator, data.toString());
}
if (hBaseColumn.isKeyColumn()) {
// Special filter for row key column
return new RowFilter(
OPERATORS_MAP.get(operator),
comparator);
} else {
return new SingleColumnValueFilter(
hBaseColumn.columnFamilyBytes(),
hBaseColumn.qualifierBytes(),
OPERATORS_MAP.get(operator),
comparator);
}
}
/**
* Handles IS NULL and IS NOT NULL operators
*
* @param hBaseColumn the HBase column
* @param operator the IS NULL/IS NOT NULL operator
* @return the filter for the given operator
*/
private Filter processNullOperator(HBaseColumnDescriptor hBaseColumn, Operator operator) {
CompareFilter.CompareOp compareOperation = (operator == Operator.IS_NULL) ?
CompareFilter.CompareOp.EQUAL :
CompareFilter.CompareOp.NOT_EQUAL;
return new SingleColumnValueFilter(
hBaseColumn.columnFamilyBytes(),
hBaseColumn.qualifierBytes(),
compareOperation,
new NullComparator());
}
@Test
public void parseIsNullExpression() throws Exception {
Filter filter = helper("a1o8", tupleDescription);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter result = (SingleColumnValueFilter) filter;
assertNotNull(result);
assertSame(families[1], result.getFamily());
assertSame(qualifiers[1], result.getQualifier());
assertEquals(CompareFilter.CompareOp.EQUAL, result.getOperator());
assertTrue(result.getComparator() instanceof NullComparator);
}
@Test
public void parseIsNotNullExpression() throws Exception {
Filter filter = helper("a1o9", tupleDescription);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter result = (SingleColumnValueFilter) filter;
assertNotNull(result);
assertSame(families[1], result.getFamily());
assertSame(qualifiers[1], result.getQualifier());
assertEquals(CompareFilter.CompareOp.NOT_EQUAL, result.getOperator());
assertTrue(result.getComparator() instanceof NullComparator);
}
@Test
public void testSimpleColumnOperator() throws Exception {
// id > 5
Filter filter = helper("a0c20s1d5o2", tupleDescription);
assertNotNull(filter);
assertTrue(filter instanceof SingleColumnValueFilter);
SingleColumnValueFilter scvFilter = (SingleColumnValueFilter) filter;
assertSame(families[0], scvFilter.getFamily());
assertSame(qualifiers[0], scvFilter.getQualifier());
assertEquals(CompareFilter.CompareOp.GREATER, scvFilter.getOperator());
assertTrue(scvFilter.getComparator() instanceof HBaseIntegerComparator);
assertEquals(0, scvFilter.getComparator().compareTo("5".getBytes()));
}