下面列出了怎么用org.apache.hadoop.hbase.filter.PageFilter的API类实例代码及写法,或者点击链接到github查看源代码。
private Scan getVertexIndexScanWithLimit(String label, boolean isUnique, String key, Object from, int limit, boolean reversed) {
byte[] prefix = serializeForRead(label, isUnique, key, null);
byte[] startRow = from != null
? serializeForRead(label, isUnique, key, from)
: prefix;
byte[] stopRow = HConstants.EMPTY_END_ROW;
if (graph.configuration().getInstanceType() == HBaseGraphConfiguration.InstanceType.BIGTABLE) {
if (reversed) {
throw new UnsupportedOperationException("Reverse scans not supported by Bigtable");
} else {
// PrefixFilter in Bigtable does not automatically stop
// See https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/1087
stopRow = HBaseGraphUtils.incrementBytes(prefix);
}
}
if (reversed) startRow = HBaseGraphUtils.incrementBytes(startRow);
Scan scan = new Scan(startRow, stopRow);
FilterList filterList = new FilterList();
filterList.addFilter(new PrefixFilter(prefix));
filterList.addFilter(new PageFilter(limit));
scan.setFilter(filterList);
scan.setReversed(reversed);
return scan;
}
private static <T> void setLimit(
final RangeReaderParams<T> readerParams,
final FilterList filterList) {
if ((readerParams.getLimit() != null) && (readerParams.getLimit() > 0)) {
// @formatter:off
// TODO in hbase 1.4.x there is a scan.getLimit() and
// scan.setLimit() which is perfectly suited for this
// if (readerParams.getLimit() < scanner.getLimit() || scanner.getLimit() <= 0) {
// also in hbase 1.4.x readType.PREAD would make sense for
// limits
// scanner.setReadType(ReadType.PREAD);
// scanner.setLimit(
// readerParams.getLimit());
// }
// @formatter:on
// however, to be compatible with earlier versions of hbase, for now
// we are using a page filter
filterList.addFilter(new PageFilter(readerParams.getLimit()));
}
}
/**
* 获取过滤器。
*
* @return 过滤器。
*/
public synchronized Filter getFilter() {
if (pageSize > 0) {
addFilter(new PageFilter(pageSize));
pageSize = 0L;
}
return filters;
}
public Iterator<Edge> edges(Object fromId, int limit) {
final EdgeReader parser = new EdgeReader(graph);
Scan scan = fromId != null ? new Scan(ValueUtils.serializeWithSalt(fromId)) : new Scan();
scan.setFilter(new PageFilter(limit));
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
return CloseableIteratorUtils.limit(HBaseGraphUtils.mapWithCloseAtEnd(scanner, parser::parse), limit);
} catch (IOException e) {
throw new HBaseGraphException(e);
}
}
public Iterator<Vertex> vertices(Object fromId, int limit) {
final VertexReader parser = new VertexReader(graph);
Scan scan = fromId != null ? new Scan(ValueUtils.serializeWithSalt(fromId)) : new Scan();
scan.setFilter(new PageFilter(limit));
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
return CloseableIteratorUtils.limit(HBaseGraphUtils.mapWithCloseAtEnd(scanner, parser::parse), limit);
} catch (IOException e) {
throw new HBaseGraphException(e);
}
}
@Override
boolean testRow(final int i) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
.setScanMetricsEnabled(true);
FilterList list = new FilterList();
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
if (opts.addColumns) {
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
scan.addColumn(familyName, qualifier);
}
} else {
scan.addFamily(familyName);
}
}
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
}
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
scan.setFilter(list);
ResultScanner s = this.table.getScanner(scan);
try {
for (Result rr; (rr = s.next()) != null;) {
updateValueSize(rr);
}
} finally {
updateScanMetrics(s.getScanMetrics());
s.close();
}
return true;
}
@Override
void testRow(final int i) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, this.totalRows));
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
ResultScanner s = this.table.getScanner(scan);
s.close();
}
/**
* Return row count limit of PageFilter if exists and there is no where
* clause filter.
* @return
*/
private static Long getUnfilteredPageLimit(Scan scan) {
Long pageLimit = null;
Iterator<Filter> filters = ScanUtil.getFilterIterator(scan);
while (filters.hasNext()) {
Filter filter = filters.next();
if (filter instanceof BooleanExpressionFilter) {
return null;
}
if (filter instanceof PageFilter) {
pageLimit = ((PageFilter)filter).getPageSize();
}
}
return pageLimit;
}
public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement, RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException {
super(context, tableRef, groupBy);
this.splits = getSplits(context, tableRef, statement.getHint());
this.iteratorFactory = iteratorFactory;
Scan scan = context.getScan();
PTable table = tableRef.getTable();
if (projector.isProjectEmptyKeyValue()) {
Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
if (familyMap.isEmpty() && table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
} else {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies());
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
}
}
}
if (limit != null) {
ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
}
}
/**
* List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results
* @param entityId entity id
* @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned
* @return list of events
* @throws AtlasException
*/
public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n)
throws AtlasException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
}
Table table = null;
ResultScanner scanner = null;
try {
table = connection.getTable(tableName);
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned.
* Stop row is set to the entity id to avoid going past the current entity while scanning
* small is set to true to optimise RPC calls as the scanner is created per request
*/
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStopRow(Bytes.toBytes(entityId))
.setCaching(n)
.setSmall(true);
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
}
scanner = table.getScanner(scan);
Result result;
List<EntityAuditEvent> events = new ArrayList<>();
//PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) {
EntityAuditEvent event = fromKey(result.getRow());
//In case the user sets random start key, guarding against random events
if (!event.getEntityId().equals(entityId)) {
continue;
}
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (persistEntityDefinition) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(scanner);
close(table);
}
}
/**
* List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results
* @param entityId entity id
* @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned
* @return list of events
* @throws AtlasException
*/
public List<EntityAuditEvent> listEvents(String entityId, String startKey, short n)
throws AtlasException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
}
Table table = null;
ResultScanner scanner = null;
try {
table = connection.getTable(tableName);
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned.
* Stop row is set to the entity id to avoid going past the current entity while scanning
* small is set to true to optimise RPC calls as the scanner is created per request
*/
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStopRow(Bytes.toBytes(entityId))
.setCaching(n)
.setSmall(true);
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
}
scanner = table.getScanner(scan);
Result result;
List<EntityAuditEvent> events = new ArrayList<>();
//PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) {
EntityAuditEvent event = fromKey(result.getRow());
//In case the user sets random start key, guarding against random events
if (!event.getEntityId().equals(entityId)) {
continue;
}
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (persistEntityDefinition) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(scanner);
close(table);
}
}
public void setNumRows(int n) {
this.filterList.addFilter(new PageFilter(n == 0 ? DEFAULT_NUM_ROWS : n));
}
@Test
public void testGetScanConfig() throws Exception {
final List<Criteria.Field> fr = new ArrayList<>();
fr.add(new CriteriaImpl.FieldImpl("notifierName", "test_notifier"));
fr.add(new CriteriaImpl.FieldImpl("status", "NEW"));
final List<byte[]> nnList = Arrays.asList("s".getBytes(CHARSET),
"qs".getBytes(CHARSET),
"NEW".getBytes(CHARSET));
new Expectations() {
{
mockNotificationCriteria.clazz();
times = 1;
result = Notification.class;
mockNotificationCriteria.fieldRestrictions();
times = 1;
result = fr;
mockIndexMapper.getIndexedFieldNames(); times = 1;
result = Arrays.asList("notifierName");
mockNotificationCriteria.numRows(); times = 1;
result = 5;
mockIndexMapper.mapMemberValue("status", "NEW"); times = 1;
result = nnList;
}
};
hBaseScanConfigBuilder = new HBaseScanConfigBuilder();
hBaseScanConfigBuilder.addMappers(Notification.class, Arrays.asList(mockIndexMapper));
Criteria<Notification> eventCriteria = new CriteriaImpl<>(Notification.class);
HBaseScanConfig<Notification> notificationScanConfig = hBaseScanConfigBuilder.getScanConfig(mockNotificationCriteria);
System.out.println(notificationScanConfig);
assertEquals(mockIndexMapper, notificationScanConfig.getMapper());
assertArrayEquals("test_notifier|0".getBytes(CHARSET), notificationScanConfig.getStartRow());
assertArrayEquals(("test_notifier|"+Long.MAX_VALUE).getBytes(CHARSET), notificationScanConfig.getStopRow());
assertEquals(2, notificationScanConfig.filterList().getFilters().size());
// column filter should be first
Filter firstFilter = notificationScanConfig.filterList().getFilters().get(0);
assertEquals(SingleColumnValueFilter.class, firstFilter.getClass());
// page filter should be last
Filter secondFilter = notificationScanConfig.filterList().getFilters().get(1);
assertEquals(PageFilter.class, secondFilter.getClass());
}
@Test
public void testGetScanConfigWithTs() throws Exception {
final List<Criteria.Field> fr = new ArrayList<>();
fr.add(new CriteriaImpl.FieldImpl("notifierName", "test_notifier"));
fr.add(new CriteriaImpl.FieldImpl("status", "NEW"));
final long ts = System.currentTimeMillis();
final long endts = ts + 100;
final List<byte[]> nnList = Arrays.asList("s".getBytes(CHARSET),
"qs".getBytes(CHARSET),
"NEW".getBytes(CHARSET));
new Expectations() {
{
mockNotificationCriteria.clazz();
times = 1;
result = Notification.class;
mockNotificationCriteria.fieldRestrictions();
times = 1;
result = fr;
mockIndexMapper.getIndexedFieldNames(); times = 1;
result = Arrays.asList("notifierName");
mockNotificationCriteria.numRows(); times = 1;
result = 5;
mockIndexMapper.mapMemberValue("status", "NEW"); times = 1;
result = nnList;
mockNotificationCriteria.startTs();
result = ts;
mockNotificationCriteria.endTs();
result = endts;
}
};
hBaseScanConfigBuilder = new HBaseScanConfigBuilder();
hBaseScanConfigBuilder.addMappers(Notification.class, Arrays.asList(mockIndexMapper));
Criteria<Notification> eventCriteria = new CriteriaImpl<>(Notification.class);
HBaseScanConfig<Notification> notificationScanConfig = hBaseScanConfigBuilder.getScanConfig(mockNotificationCriteria);
System.out.println(notificationScanConfig);
assertEquals(mockIndexMapper, notificationScanConfig.getMapper());
assertArrayEquals(("test_notifier|"+ts).getBytes(CHARSET), notificationScanConfig.getStartRow());
assertArrayEquals(("test_notifier|"+endts).getBytes(CHARSET), notificationScanConfig.getStopRow());
assertEquals(2, notificationScanConfig.filterList().getFilters().size());
// column filter should be first
Filter firstFilter = notificationScanConfig.filterList().getFilters().get(0);
assertEquals(SingleColumnValueFilter.class, firstFilter.getClass());
// page filter should be last
Filter secondFilter = notificationScanConfig.filterList().getFilters().get(1);
assertEquals(PageFilter.class, secondFilter.getClass());
}
private PageFilter getPageFilter(List<Filter> filterList) {
for (Filter filter : filterList) {
if (filter instanceof PageFilter) return (PageFilter)filter;
}
return null;
}
public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint());
this.plan = plan;
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
FilterableStatement statement = plan.getStatement();
RowProjector projector = plan.getProjector();
physicalTableName = table.getPhysicalName().getBytes();
tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
Scan scan = context.getScan();
// Used to tie all the scans together during logging
scanId = UUID.randomUUID().toString();
Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty();
if (projector.isProjectEmptyKeyValue()) {
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
// Where condition columns also will get added into familyMap
// When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
&& table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
} else {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
}
}
} else if (table.getViewType() == ViewType.MAPPED) {
// Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
// selected column values are returned back to client
for (PColumnFamily family : table.getColumnFamilies()) {
scan.addFamily(family.getName().getBytes());
}
}
// Add FirstKeyOnlyFilter if there are no references to key value columns
if (keyOnlyFilter) {
ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
}
// TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
if (perScanLimit != null) {
ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
}
doColumnProjectionOptimization(context, scan, table, statement);
this.scans = getParallelScans();
List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
for (List<Scan> scanList : scans) {
for (Scan aScan : scanList) {
splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow()));
}
}
this.splits = ImmutableList.copyOf(splitRanges);
// If split detected, this will be more than one, but that's unlikely
this.allFutures = Lists.newArrayListWithExpectedSize(1);
}
/**
* This method will produce an Apache Arrow Schema for the given TableName and HBase connection
* by scanning up to the requested number of rows and using basic schema inference to determine
* data types.
*
* @param client The HBase connection to use for the scan operation.
* @param tableName The HBase TableName for which to produce an Apache Arrow Schema.
* @param numToScan The number of records to scan as part of producing the Schema.
* @return An Apache Arrow Schema representing the schema of the HBase table.
*/
public static Schema inferSchema(HBaseConnection client, TableName tableName, int numToScan)
{
Scan scan = new Scan().setMaxResultSize(numToScan).setFilter(new PageFilter(numToScan));
org.apache.hadoop.hbase.TableName hbaseTableName = org.apache.hadoop.hbase.TableName.valueOf(getQualifiedTableName(tableName));
return client.scanTable(hbaseTableName, scan, (ResultScanner scanner) -> scanAndInferSchema(scanner));
}