下面列出了怎么用org.apache.hadoop.hbase.filter.FilterList的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
String[] columns = new String[]{
EventLogConstants.LOG_COLUMN_NAME_REFERER_URL,
EventLogConstants.LOG_COLUMN_NAME_UUID,
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID,
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME,
EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME,
};
//过滤数据
filterList.addFilter(this.getColumnFilter(columns));
filterList.addFilter(
new SingleColumnValueExcludeFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL,
Bytes.toBytes(
EventEnum.PAGEVIEW.alias)));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
//过滤数据,只分析launch事件
FilterList filterList = new FilterList();
filterList.addFilter(
new SingleColumnValueExcludeFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL,
Bytes.toBytes(
EventEnum.PAGEVIEW.alias)));
String[] columns = new String[]{
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, EventLogConstants.LOG_COLUMN_NAME_UUID,
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, EventLogConstants.LOG_COLUMN_NAME_PLATFORM,
EventLogConstants.LOG_COLUMN_NAME_COUNTRY,
EventLogConstants.LOG_COLUMN_NAME_PROVINCE,
EventLogConstants.LOG_COLUMN_NAME_CITY,
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME
};
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[]{EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID, // 会员id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本号
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME
// 添加一个事件名称获取列,在使用singlecolumnvaluefilter的时候必须指定对应的列是一个返回列
};
filterList.addFilter(this.getColumnFilter(columns));
// 只需要page view事件,所以进行过滤
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(EventLogConstants.EVENT_LOGS_FAMILY_NAME),
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL,
Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return filterList;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList list = new FilterList();
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间戳
EventLogConstants.LOG_COLUMN_NAME_UUID, // 用户id
EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, // 会话id
EventLogConstants.LOG_COLUMN_NAME_COUNTRY, // 国家
EventLogConstants.LOG_COLUMN_NAME_PROVINCE, // 省份
EventLogConstants.LOG_COLUMN_NAME_CITY, // 城市
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME, // 事件名称
};
list.addFilter(this.getColumnFilter(columns));
// 过滤只需要pageview事件
list.addFilter(new SingleColumnValueFilter(LocationMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return list;
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID, // 会员id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本号
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 添加一个事件名称获取列,在使用singlecolumnvaluefilter的时候必须指定对应的列是一个返回列
};
filterList.addFilter(this.getColumnFilter(columns));
// 只需要page view事件,所以进行过滤
filterList.addFilter(new SingleColumnValueFilter(ActiveMemberMapper.family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), CompareOp.EQUAL, Bytes.toBytes(EventEnum.PAGEVIEW.alias)));
return filterList;
}
public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) {
if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
Filter filter = scan.getFilter();
if (filter != null) {
// may have existed InclusiveStopFilter, see buildScan
FilterList filterList = new FilterList();
filterList.addFilter(filter);
filterList.addFilter(rowFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(rowFilter);
}
}
}
public static void andFilterAtEnd(Scan scan, Filter andWithFilter) {
if (andWithFilter == null) {
return;
}
Filter filter = scan.getFilter();
if (filter == null) {
scan.setFilter(andWithFilter);
} else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
FilterList filterList = (FilterList)filter;
List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
allFilters.addAll(filterList.getFilters());
allFilters.add(andWithFilter);
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
} else {
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
}
}
private Filter buildScanFilter() {
if (scanTable == null && scanRegionState == null) {
return null;
}
final List<Filter> filters = new ArrayList<>(2);
if (scanTable != null) {
filters.add(buildTableFilter(scanTable));
}
if (scanRegionState != null) {
filters.add(buildScanRegionStateFilter(scanRegionState));
}
if (filters.size() == 1) {
return filters.get(0);
}
return new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
}
/**
* If the result is of type {@link FilterList}, add the {@link Filter} to
* the filter list. Otherwise, set it as the resulting {@link Filter}
*
* @param filter the filter to process
*/
private void processFilter(Filter filter) {
Filter head = filterQueue.peek();
if (head instanceof FilterList) {
/*
* Handles operation between already calculated expressions.
* Currently only {@code AND}, in the future {@code OR} can be added.
* Four cases here:
* - Both are simple filters.
* - Left is a FilterList and right is a filter.
* - Left is a filter and right is a FilterList.
* - Both are FilterLists.
*/
((FilterList) head).addFilter(filter);
}
currentFilter = filter;
}
public static Filter stripSkipScanFilter(Filter filter) {
if (filter == null) {
return null;
}
if (!(filter instanceof FilterList)) {
return filter instanceof BooleanExpressionFilter ? filter : null;
}
FilterList filterList = (FilterList) filter;
if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) {
return filter;
}
List<Filter> list = new ArrayList<>();
for (Filter f : filterList.getFilters()) {
Filter stripped = stripSkipScanFilter(f);
if (stripped != null) {
list.add(stripped);
}
}
return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list));
}
/**
* creates a scan for flow data
* @param rowPrefix - start row prefix
* @param limit - limit on scanned results
* @param version - version to match
* @return Scan
*/
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
Scan scan = new Scan();
scan.setStartRow(rowPrefix);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(prefixFilter);
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
scan.setFilter(filters);
} else {
scan.setFilter(prefixFilter);
}
return scan;
}
private Scan getVertexIndexScanWithLimit(String label, boolean isUnique, String key, Object from, int limit, boolean reversed) {
byte[] prefix = serializeForRead(label, isUnique, key, null);
byte[] startRow = from != null
? serializeForRead(label, isUnique, key, from)
: prefix;
byte[] stopRow = HConstants.EMPTY_END_ROW;
if (graph.configuration().getInstanceType() == HBaseGraphConfiguration.InstanceType.BIGTABLE) {
if (reversed) {
throw new UnsupportedOperationException("Reverse scans not supported by Bigtable");
} else {
// PrefixFilter in Bigtable does not automatically stop
// See https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/1087
stopRow = HBaseGraphUtils.incrementBytes(prefix);
}
}
if (reversed) startRow = HBaseGraphUtils.incrementBytes(startRow);
Scan scan = new Scan(startRow, stopRow);
FilterList filterList = new FilterList();
filterList.addFilter(new PrefixFilter(prefix));
filterList.addFilter(new PageFilter(limit));
scan.setFilter(filterList);
scan.setReversed(reversed);
return scan;
}
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
createTable(thriftAdmin, tableName);
try (Table table = connection.getTable(TableName.valueOf(tableName))){
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
CompareOperator.EQUAL, VALUE_1);
filterList.addFilter(prefixFilter);
filterList.addFilter(columnValueFilter);
Scan scan = new Scan();
scan.readVersions(2);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
assertTrue(iterator.hasNext());
int counter = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
counter += result.size();
}
assertEquals(2, counter);
}
}
protected static void workaroundHBASE2198(Get get, Filter filter, byte[][] qualifiers) {
if (filter instanceof SingleColumnValueFilter) {
if (qualifiers == null) {
get.addFamily(((SingleColumnValueFilter)filter).getFamily());
} else {
get.addColumn(((SingleColumnValueFilter)filter).getFamily(),
((SingleColumnValueFilter)filter).getQualifier());
}
return;
}
if (filter instanceof FilterList) {
for (Filter f : ((FilterList)filter).getFilters()) {
workaroundHBASE2198(get, f, qualifiers);
}
}
}
public static void filterLimitValueRange(String projectId, String instanceId, String tableId) {
// A filter that matches cells whose values are between the given values
ValueFilter valueGreaterFilter =
new ValueFilter(
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("PQ2A.190405")));
ValueFilter valueLesserFilter =
new ValueFilter(
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("PQ2A.190406")));
FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filter.addFilter(valueGreaterFilter);
filter.addFilter(valueLesserFilter);
Scan scan = new Scan().setFilter(filter);
readWithFilter(projectId, instanceId, tableId, scan);
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
protected ResultScanner buildScanner(String keyPrefix, String value, Table ht)
throws IOException {
// OurFilterList allFilters = new OurFilterList();
FilterList allFilters = new FilterList(/* FilterList.Operator.MUST_PASS_ALL */);
allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
.toBytes("trans-tags"), Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes
.toBytes(value));
filter.setFilterIfMissing(true);
allFilters.addFilter(filter);
// allFilters.addFilter(new
// RowExcludingSingleColumnValueFilter(Bytes.toBytes("trans-tags"),
// Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)));
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("trans-blob"));
scan.addFamily(Bytes.toBytes("trans-type"));
scan.addFamily(Bytes.toBytes("trans-date"));
scan.addFamily(Bytes.toBytes("trans-tags"));
scan.addFamily(Bytes.toBytes("trans-group"));
scan.setFilter(allFilters);
return ht.getScanner(scan);
}
private static <T> void setLimit(
final RangeReaderParams<T> readerParams,
final FilterList filterList) {
if ((readerParams.getLimit() != null) && (readerParams.getLimit() > 0)) {
// @formatter:off
// TODO in hbase 1.4.x there is a scan.getLimit() and
// scan.setLimit() which is perfectly suited for this
// if (readerParams.getLimit() < scanner.getLimit() || scanner.getLimit() <= 0) {
// also in hbase 1.4.x readType.PREAD would make sense for
// limits
// scanner.setReadType(ReadType.PREAD);
// scanner.setLimit(
// readerParams.getLimit());
// }
// @formatter:on
// however, to be compatible with earlier versions of hbase, for now
// we are using a page filter
filterList.addFilter(new PageFilter(readerParams.getLimit()));
}
}
protected Scan constructScan(byte[] valuePrefix) throws IOException {
FilterList list = new FilterList();
Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
list.addFilter(filter);
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
}
Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
.setScanMetricsEnabled(true);
if (opts.addColumns) {
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
scan.addColumn(FAMILY_ZERO, qualifier);
}
} else {
scan.addFamily(FAMILY_ZERO);
}
scan.setFilter(list);
return scan;
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
public static void andFilterAtBeginning(Scan scan, Filter andWithFilter) {
if (andWithFilter == null) {
return;
}
Filter filter = scan.getFilter();
if (filter == null) {
scan.setFilter(andWithFilter);
} else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
FilterList filterList = (FilterList)filter;
List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
allFilters.add(andWithFilter);
allFilters.addAll(filterList.getFilters());
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
} else {
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(andWithFilter, filter)));
}
}
public static void andFilterAtEnd(Scan scan, Filter andWithFilter) {
if (andWithFilter == null) {
return;
}
Filter filter = scan.getFilter();
if (filter == null) {
scan.setFilter(andWithFilter);
} else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
FilterList filterList = (FilterList)filter;
List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
allFilters.addAll(filterList.getFilters());
allFilters.add(andWithFilter);
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
} else {
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
}
}
public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) {
if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
Filter filter = scan.getFilter();
if (filter != null) {
// may have existed InclusiveStopFilter, see buildScan
FilterList filterList = new FilterList();
filterList.addFilter(filter);
filterList.addFilter(rowFilter);
scan.setFilter(filterList);
} else {
scan.setFilter(rowFilter);
}
}
}
@Override
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
throws IOException {
if (!initialized) {
throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
}
// Nothing to do if authorization is not enabled
if (!authorizationEnabled) {
return;
}
Region region = e.getEnvironment().getRegion();
Authorizations authorizations = null;
try {
authorizations = scan.getAuthorizations();
} catch (DeserializationException de) {
throw new IOException(de);
}
if (authorizations == null) {
// No Authorizations present for this scan/Get!
// In case of system tables other than "labels" just scan with out visibility check and
// filtering. Checking visibility labels for META and NAMESPACE table is not needed.
TableName table = region.getRegionInfo().getTable();
if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
return;
}
}
Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(region,
authorizations);
if (visibilityLabelFilter != null) {
Filter filter = scan.getFilter();
if (filter != null) {
scan.setFilter(new FilterList(filter, visibilityLabelFilter));
} else {
scan.setFilter(visibilityLabelFilter);
}
}
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID, // 会员id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION // 浏览器版本信息
};
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
private void assertViewHeaderRowsHaveViewTTLRelatedCells(String schemaName, long minTimestamp,
boolean rawScan, int expectedRows) throws IOException, SQLException {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
RowFilter schemaNameFilter = new RowFilter(
CompareFilter.CompareOp.EQUAL,
new SubstringComparator(schemaName)
);
QualifierFilter viewTTLQualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(PhoenixDatabaseMetaData.VIEW_TTL_BYTES));
filterList.addFilter(schemaNameFilter);
filterList.addFilter(viewTTLQualifierFilter);
try (Table tbl = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
Scan allRows = new Scan();
allRows.setRaw(rawScan);
allRows.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP);
allRows.setFilter(filterList);
ResultScanner scanner = tbl.getScanner(allRows);
int numMatchingRows = 0;
for (Result result = scanner.next(); result != null; result = scanner.next()) {
numMatchingRows +=
result.containsColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
PhoenixDatabaseMetaData.VIEW_TTL_BYTES) ? 1 : 0;
}
assertEquals(String.format("Expected rows do not match for table = %s at timestamp %d",
Bytes.toString(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES), minTimestamp), expectedRows, numMatchingRows);
}
}
@Override
protected Filter fetchHbaseFilter() {
FilterList filterList = new FilterList();
// 定义mapper中需要获取的列名
String[] columns = new String[] { EventLogConstants.LOG_COLUMN_NAME_SESSION_ID, // 会话id
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION // 浏览器版本号
};
filterList.addFilter(this.getColumnFilter(columns));
return filterList;
}
void addFilterByMapping( FilterList fl, CompareFilter.CompareOp comp, Class<?> comparatorClass, Object comparator,
Mapping.TupleMapping tupleMapping )
throws NoSuchMethodException, InstantiationException, IllegalAccessException,
java.lang.reflect.InvocationTargetException {
switch ( tupleMapping ) {
case KEY: {
addFilter( RowFilter.class, fl, comp, comparatorClass, comparator );
return;
}
case FAMILY: {
addFilter( FamilyFilter.class, fl, comp, comparatorClass, comparator );
return;
}
case COLUMN: {
//TODO Check if ColumnPrefixFilter works faster and suit more
addFilter( QualifierFilter.class, fl, comp, comparatorClass, comparator );
return;
}
case VALUE: {
addFilter( ValueFilter.class, fl, comp, comparatorClass, comparator );
return;
}
case TIMESTAMP: {
addFilter( TimestampsFilter.class, fl, comp, comparatorClass, comparator );
// Constructor<TimestampsFilter> columnFilterConstructor =
// TimestampsFilter.class.getConstructor( CompareFilter.CompareOp.class, comparatorClass );
// TimestampsFilter scf = columnFilterConstructor.newInstance( comp, comparator );
// fl.addFilter( scf );
return;
}
}
}