下面列出了怎么用org.apache.hadoop.hbase.filter.PrefixFilter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 查询某个用户所有的主叫电话(type=1)
* 某个用户
* type=1
*
*/
@Test
public void getType() throws IOException {
Scan scan = new Scan();
//创建过滤器集合
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
//创建过滤器
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("type"),CompareOperator.EQUAL,Bytes.toBytes("1"));
filters.addFilter(filter1);
//前缀过滤器
PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("15883348450"));
filters.addFilter(filter2);
scan.setFilter(filters);
ResultScanner rss = table.getScanner(scan);
for (Result rs:rss) {
System.out.print(Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("dnum")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("type")))));
System.out.print("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("length")))));
System.out.println("--"+Bytes.toString(CellUtil.cloneValue(rs.getColumnLatestCell(Bytes.toBytes("cf"),Bytes.toBytes("date")))));
}
}
@Test
public void testScan() throws IOException {
Connection connection = admin.getConnection();
Table table = connection.getTable(TableName.valueOf("tbl_girls"));
Scan scan = new Scan(Bytes.toBytes("0001"), Bytes.toBytes("0004"));
// RowKeyFilter
Filter filter = new PrefixFilter(Bytes.toBytes("000"));
scan.setFilter(filter);
Filter filter2 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("000"));
scan.setFilter(filter2);
//BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(29));
Filter filter3 = new SingleColumnValueFilter(Bytes.toBytes("base_info"), Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes(29));
scan.setFilter(filter3);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
LOGGER.info(result.toString());
int value = Bytes.toInt(result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age")));
LOGGER.info(String.valueOf(value));
}
}
private Scan getVertexIndexScanWithLimit(String label, boolean isUnique, String key, Object from, int limit, boolean reversed) {
byte[] prefix = serializeForRead(label, isUnique, key, null);
byte[] startRow = from != null
? serializeForRead(label, isUnique, key, from)
: prefix;
byte[] stopRow = HConstants.EMPTY_END_ROW;
if (graph.configuration().getInstanceType() == HBaseGraphConfiguration.InstanceType.BIGTABLE) {
if (reversed) {
throw new UnsupportedOperationException("Reverse scans not supported by Bigtable");
} else {
// PrefixFilter in Bigtable does not automatically stop
// See https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/1087
stopRow = HBaseGraphUtils.incrementBytes(prefix);
}
}
if (reversed) startRow = HBaseGraphUtils.incrementBytes(startRow);
Scan scan = new Scan(startRow, stopRow);
FilterList filterList = new FilterList();
filterList.addFilter(new PrefixFilter(prefix));
filterList.addFilter(new PageFilter(limit));
scan.setFilter(filterList);
scan.setReversed(reversed);
return scan;
}
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
createTable(thriftAdmin, tableName);
try (Table table = connection.getTable(TableName.valueOf(tableName))){
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
CompareOperator.EQUAL, VALUE_1);
filterList.addFilter(prefixFilter);
filterList.addFilter(columnValueFilter);
Scan scan = new Scan();
scan.readVersions(2);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
assertTrue(iterator.hasNext());
int counter = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
counter += result.size();
}
assertEquals(2, counter);
}
}
protected ResultScanner buildScanner(String keyPrefix, String value, Table ht)
throws IOException {
// OurFilterList allFilters = new OurFilterList();
FilterList allFilters = new FilterList(/* FilterList.Operator.MUST_PASS_ALL */);
allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
.toBytes("trans-tags"), Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes
.toBytes(value));
filter.setFilterIfMissing(true);
allFilters.addFilter(filter);
// allFilters.addFilter(new
// RowExcludingSingleColumnValueFilter(Bytes.toBytes("trans-tags"),
// Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)));
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("trans-blob"));
scan.addFamily(Bytes.toBytes("trans-type"));
scan.addFamily(Bytes.toBytes("trans-date"));
scan.addFamily(Bytes.toBytes("trans-tags"));
scan.addFamily(Bytes.toBytes("trans-group"));
scan.setFilter(allFilters);
return ht.getScanner(scan);
}
/**
* Returns the {@link Flow} instance matching the application ID and run ID.
*
* @param cluster the cluster identifier
* @param user the user running the jobs
* @param appId the application description
* @param runId the specific run ID for the flow
* @param populateTasks whether or not to populate the task details for each
* job
* @return
*/
public Flow getFlow(String cluster, String user, String appId, long runId,
boolean populateTasks) throws IOException {
Flow flow = null;
byte[] startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
Bytes.toBytes(user), Bytes.toBytes(appId),
Bytes.toBytes(FlowKey.encodeRunId(runId)), Constants.EMPTY_BYTES);
LOG.info(
"Reading job_history rows start at " + Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning history at cluster!user!app!run!
scan.setStartRow(startRow);
// require that all results match this flow prefix
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
List<Flow> flows = createFromResults(scan, populateTasks, 1);
if (flows.size() > 0) {
flow = flows.get(0);
}
return flow;
}
/**
* Returns the {@link Flow} instance containing the given job ID.
*
* @param cluster the cluster identifier
* @param jobId the job identifier
* @return
*/
public Flow getFlowByJobID(String cluster, String jobId,
boolean populateTasks) throws IOException {
Flow flow = null;
JobKey key = idService.getJobKeyById(new QualifiedJobId(cluster, jobId));
if (key != null) {
byte[] startRow =
ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(key.getCluster()),
Bytes.toBytes(key.getUserName()), Bytes.toBytes(key.getAppId()),
Bytes.toBytes(key.getEncodedRunId()), Constants.EMPTY_BYTES);
LOG.info("Reading job_history rows start at "
+ Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning history at cluster!user!app!run!
scan.setStartRow(startRow);
// require that all results match this flow prefix
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
List<Flow> flows = createFromResults(scan, populateTasks, 1);
if (flows.size() > 0) {
flow = flows.get(0);
}
}
return flow;
}
/**
* creates a scan for flow data
* @param rowPrefix - start row prefix
* @param limit - limit on scanned results
* @param version - version to match
* @return Scan
*/
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
Scan scan = new Scan();
scan.setStartRow(rowPrefix);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(prefixFilter);
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
scan.setFilter(filters);
} else {
scan.setFilter(prefixFilter);
}
return scan;
}
public final List<E> scanByRowPrefix(String prefix) {
HTableInterface hTableInterface = getHTableInterface();
try {
Scan scan = new Scan();
scan.setFilter(new PrefixFilter(Bytes.toBytes(prefix)));
ResultScanner resultScanner = hTableInterface.getScanner(scan);
return parse(resultScanner);
} catch (Exception cause) {
throw new RuntimeException(cause);
} finally {
closeHTableInterface(hTableInterface);
}
}
@Override
public int scannerOpenWithPrefix(ByteBuffer tableName,
ByteBuffer startAndPrefix,
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
Table table = null;
try {
table = getTable(tableName);
Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
addAttributes(scan, attributes);
Filter f = new WhileMatchFilter(
new PrefixFilter(getBytes(startAndPrefix)));
scan.setFilter(f);
if (columns != null && !columns.isEmpty()) {
for(ByteBuffer column : columns) {
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
if(famQf.length == 1) {
scan.addFamily(famQf[0]);
} else {
scan.addColumn(famQf[0], famQf[1]);
}
}
}
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw getIOError(e);
} finally{
closeTable(table);
}
}
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(",");
Arrays.sort(rowPrefixArray);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (String prefix : rowPrefixArray) {
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
filterList.addFilter(filter);
}
scan.setFilter(filterList);
byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
}
}
@Test
public void testPrefixFilter() throws Exception {
// Grab rows from group one (half of total)
long expectedRows = numRows / 2;
long expectedKeys = colsPerRow;
Scan s = new Scan();
s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne")));
verifyScan(s, expectedRows, expectedKeys);
}
@Override
public List<String> getAllSearchQueryStringsByCustomerInLastOneMonth(final Long customerId) {
LOG.debug("Calling getAllSearchQueryStringsByCustomerInLastOneMonth for customerid: {}", customerId);
Scan scan = new Scan();
scan.addColumn(HbaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES,
Bytes.toBytes("querystring"));
Filter filter = new PrefixFilter(Bytes.toBytes(customerId + "-"));
scan.setFilter(filter);
DateTime dateTime = new DateTime();
try {
scan.setTimeRange(dateTime.minusDays(30).getMillis(), dateTime.getMillis());
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> rows = hbaseTemplate.find("searchclicks", scan, new RowMapper<String>() {
@Override
public String mapRow(Result result, int rowNum) throws Exception {
LOG.debug("Row is: {}", new String(result.getRow()));
byte[] value = result.getValue(
HbaseJsonEventSerializer.COLUMFAMILY_SEARCH_BYTES,
Bytes.toBytes("querystring"));
String queryString = null;
if (value != null) {
queryString = new String(value);
LOG.debug("Query String: {}",
new Object[] { queryString });
}
return queryString;
}
});
List<String> list = new ArrayList<>();
for (String string : rows) {
if(string !=null )
{
list.add(string);
}
}
LOG.debug("Checking getAllSearchQueryStringsByCustomerInLastOneMonth done with list:{}", list);
return list;
}
/**
* Returns a Scan instance to retrieve all the task rows for a given job from
* the job_history_task table.
* @param jobKey the job key to match for all task rows
* @return a {@code Scan} instance for the job_history_task table
*/
private Scan getTaskScan(JobKey jobKey) {
byte[] startKey =
Bytes.add(jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
Scan scan = new Scan();
scan.setStartRow(startKey);
// only return tasks for this job
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
// expect a lot of tasks on average
scan.setCaching(500);
return scan;
}
/**
* Retrieves all the event rows matching a single
* {@link com.twitter.hraven.Flow}.
* @param flowKey
* @return
*/
public List<FlowEvent> getFlowEvents(FlowKey flowKey) throws IOException {
byte[] startKey =
Bytes.add(flowKeyConverter.toBytes(flowKey), Constants.SEP_BYTES);
Scan scan = new Scan(startKey);
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
List<FlowEvent> results = new ArrayList<FlowEvent>();
ResultScanner scanner = null;
Table eventTable = null;
try {
eventTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
scanner = eventTable.getScanner(scan);
for (Result r : scanner) {
FlowEvent event = createEventFromResult(r);
if (event != null) {
results.add(event);
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (eventTable != null) {
eventTable.close();
}
}
}
return results;
}
/**
* Retrieves all events added after the given event key (with sequence numbers
* greater than the given key). If no new events are found returns an empty
* list.
* @param lastSeen
* @return
*/
public List<FlowEvent> getFlowEventsSince(FlowEventKey lastSeen)
throws IOException {
// rows must match the FlowKey portion + SEP
byte[] keyPrefix =
Bytes.add(flowKeyConverter.toBytes(lastSeen), Constants.SEP_BYTES);
// start at the next following sequence number
FlowEventKey nextEvent = new FlowEventKey(lastSeen.getCluster(),
lastSeen.getUserName(), lastSeen.getAppId(), lastSeen.getRunId(),
lastSeen.getSequence() + 1);
byte[] startKey = keyConverter.toBytes(nextEvent);
Scan scan = new Scan(startKey);
scan.setFilter(new WhileMatchFilter(new PrefixFilter(keyPrefix)));
List<FlowEvent> results = new ArrayList<FlowEvent>();
ResultScanner scanner = null;
Table eventTable = null;
try {
eventTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
scanner = eventTable.getScanner(scan);
for (Result r : scanner) {
FlowEvent event = createEventFromResult(r);
if (event != null) {
results.add(event);
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (eventTable != null) {
eventTable.close();
}
}
}
return results;
}
/**
* Gets hdfs stats about all dirs on the given cluster
* @param cluster
* @param pathPrefix
* @param limit
* @param runId
* @return list of hdfs stats
* @throws IOException
*/
public List<HdfsStats> getAllDirs(String cluster, String pathPrefix,
int limit, long runId) throws IOException {
long encodedRunId = getEncodedRunId(runId);
String rowPrefixStr =
Long.toString(encodedRunId) + HdfsConstants.SEP + cluster;
if (StringUtils.isNotEmpty(pathPrefix)) {
// path expected to be cleansed at collection/storage time as well
rowPrefixStr += HdfsConstants.SEP + StringUtil.cleanseToken(pathPrefix);
}
LOG.info(" Getting all dirs for cluster " + cluster + " with pathPrefix: "
+ pathPrefix + " for runId " + runId + " encodedRunId: " + encodedRunId
+ " limit: " + limit + " row prefix : " + rowPrefixStr);
byte[] rowPrefix = Bytes.toBytes(rowPrefixStr);
Scan scan = createScanWithAllColumns();
scan.setStartRow(rowPrefix);
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
scan.setFilter(prefixFilter);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than
// necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// we need only the latest cell version
scan.setMaxVersions(1);
return createFromScanResults(cluster, null, scan, limit, Boolean.FALSE, 0l,
0l);
}
private static Filter buildTableFilter(final TableName tableName) {
return new PrefixFilter(tableName.toBytes());
}
@Test
public void testScan() throws Exception {
byte[] startRow = Bytes.toBytes("startRow");
byte[] stopRow = Bytes.toBytes("stopRow");
byte[] fam = Bytes.toBytes("fam");
byte[] qf1 = Bytes.toBytes("qf1");
long ts = System.currentTimeMillis();
int maxVersions = 2;
Scan scan = new Scan().withStartRow(startRow).withStopRow(stopRow);
scan.addColumn(fam, qf1);
scan.setTimeRange(ts, ts + 1);
scan.readVersions(maxVersions);
ClientProtos.Scan scanProto = ProtobufUtil.toScan(scan);
Scan desScan = ProtobufUtil.toScan(scanProto);
assertTrue(Bytes.equals(scan.getStartRow(), desScan.getStartRow()));
assertTrue(Bytes.equals(scan.getStopRow(), desScan.getStopRow()));
assertEquals(scan.getCacheBlocks(), desScan.getCacheBlocks());
Set<byte[]> set = null;
Set<byte[]> desSet = null;
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
assertTrue(desScan.getFamilyMap().containsKey(entry.getKey()));
set = entry.getValue();
desSet = desScan.getFamilyMap().get(entry.getKey());
for (byte[] column : set) {
assertTrue(desSet.contains(column));
}
// Test filters are serialized properly.
scan = new Scan().withStartRow(startRow);
final String name = "testScan";
byte[] prefix = Bytes.toBytes(name);
scan.setFilter(new PrefixFilter(prefix));
scanProto = ProtobufUtil.toScan(scan);
desScan = ProtobufUtil.toScan(scanProto);
Filter f = desScan.getFilter();
assertTrue(f instanceof PrefixFilter);
}
assertEquals(scan.getMaxVersions(), desScan.getMaxVersions());
TimeRange tr = scan.getTimeRange();
TimeRange desTr = desScan.getTimeRange();
assertEquals(tr.getMax(), desTr.getMax());
assertEquals(tr.getMin(), desTr.getMin());
}
/**
* Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
* attempt with invalid values.
*/
@Test
public void testWithFilter() throws Throwable {
// Create simple table to export
TableDescriptor desc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(name.getMethodName()))
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
.setMaxVersions(5)
.build())
.build();
UTIL.getAdmin().createTable(desc);
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
Put p1 = new Put(ROW1);
p1.addColumn(FAMILYA, QUAL, now, QUAL);
p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
// Having another row would actually test the filter.
Put p2 = new Put(ROW2);
p2.addColumn(FAMILYA, QUAL, now, QUAL);
exportTable.put(Arrays.asList(p1, p2));
// Export the simple table
String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
assertTrue(runExport(args));
// Import to a new table
final String IMPORT_TABLE = name.getMethodName() + "import";
desc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(IMPORT_TABLE))
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
.setMaxVersions(5)
.build())
.build();
UTIL.getAdmin().createTable(desc);
Table importTable = UTIL.getConnection().getTable(desc.getTableName());
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
FQ_OUTPUT_DIR,
"1000" };
assertTrue(runImport(args));
// get the count of the source table for that time range
PrefixFilter filter = new PrefixFilter(ROW1);
int count = getCount(exportTable, filter);
Assert.assertEquals("Unexpected row count between export and import tables", count,
getCount(importTable, null));
// and then test that a broken command doesn't bork everything - easier here because we don't
// need to re-run the export job
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
FQ_OUTPUT_DIR, "1000" };
assertFalse(runImport(args));
// cleanup
exportTable.close();
importTable.close();
}
@Test
public void testExportScan() throws Exception {
int version = 100;
long startTime = System.currentTimeMillis();
long endTime = startTime + 1;
String prefix = "row";
String label_0 = "label_0";
String label_1 = "label_1";
String[] args = {
"table",
"outputDir",
String.valueOf(version),
String.valueOf(startTime),
String.valueOf(endTime),
prefix
};
Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
assertEquals(version, scan.getMaxVersions());
assertEquals(startTime, scan.getTimeRange().getMin());
assertEquals(endTime, scan.getTimeRange().getMax());
assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
String[] argsWithLabels = {
"-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
"table",
"outputDir",
String.valueOf(version),
String.valueOf(startTime),
String.valueOf(endTime),
prefix
};
Configuration conf = new Configuration(UTIL.getConfiguration());
// parse the "-D" options
String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
assertEquals(version, scanWithLabels.getMaxVersions());
assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
}
/**
* Returns the {@link Flow} runs' stats - summed up per flow If the
* {@code version} parameter is non-null, the returned results will be
* restricted to those matching this app version.
*
* <p>
* <strong>Note:</strong> this retrieval method will omit the configuration
* data from all of the returned jobs.
* </p>
*
* @param cluster the cluster where the jobs were run
* @param user the user running the jobs
* @param appId the application identifier for the jobs
* @param version if non-null, only flows matching this application version
* will be returned
* @param startTime the start time for the flows to be looked at
* @param endTime the end time for the flows to be looked at
* @param limit the maximum number of flows to return
* @return
*/
public List<Flow> getFlowTimeSeriesStats(String cluster, String user,
String appId, String version, long startTime, long endTime, int limit,
byte[] startRow) throws IOException {
// app portion of row key
byte[] rowPrefix = Bytes.toBytes((cluster + Constants.SEP + user
+ Constants.SEP + appId + Constants.SEP));
byte[] scanStartRow;
if (startRow != null) {
scanStartRow = startRow;
} else {
if (endTime != 0) {
// use end time in start row, if present
long endRunId = FlowKey.encodeRunId(endTime);
scanStartRow =
Bytes.add(rowPrefix, Bytes.toBytes(endRunId), Constants.SEP_BYTES);
} else {
scanStartRow = rowPrefix;
}
}
// TODO: use RunMatchFilter to limit scan on the server side
Scan scan = new Scan();
scan.setStartRow(scanStartRow);
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (startTime != 0) {
// if limited by start time, early out as soon as we hit it
long startRunId = FlowKey.encodeRunId(startTime);
// zero byte at the end makes the startRunId inclusive
byte[] scanEndRow = Bytes.add(rowPrefix, Bytes.toBytes(startRunId),
Constants.ZERO_SINGLE_BYTE);
scan.setStopRow(scanEndRow);
} else {
// require that all rows match the app prefix we're looking for
filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
}
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
}
// filter out all config columns except the queue name
filters.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL,
new RegexStringComparator(
"^c\\!((?!" + Constants.HRAVEN_QUEUE + ").)*$")));
scan.setFilter(filters);
LOG.info("scan : \n " + scan.toJSON() + " \n");
return createFromResults(scan, false, limit);
}
/**
* Returns the flows currently listed in the given {@link Flow.Status}
* @param cluster The cluster where flows have run
* @param status The flows' status
* @param limit Return up to this many Flow instances
* @param user Filter flows returned to only this user (if present)
* @param startRow Start results at this key. Use this in combination with
* {@code limit} to support pagination through the results.
* @return a list of up to {@code limit} Flows
* @throws IOException in the case of an error retrieving the data
*/
public List<Flow> getFlowsForStatus(String cluster, Flow.Status status,
int limit, String user, byte[] startRow) throws IOException {
byte[] rowPrefix = ByteUtil.join(Constants.SEP_BYTES,
Bytes.toBytes(cluster), status.code(), Constants.EMPTY_BYTES);
if (startRow == null) {
startRow = rowPrefix;
}
Scan scan = new Scan(startRow);
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// early out when prefix ends
filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
if (user != null) {
SingleColumnValueFilter userFilter = new SingleColumnValueFilter(
Constants.INFO_FAM_BYTES, USER_NAME_COL_BYTES,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(user));
userFilter.setFilterIfMissing(true);
filters.addFilter(userFilter);
}
scan.setFilter(filters);
// TODO: need to constrain this by timerange as well to prevent unlimited
// scans
// get back the results in a single response
scan.setCaching(limit);
List<Flow> results = new ArrayList<Flow>(limit);
ResultScanner scanner = null;
Table flowQueueTable = null;
try {
flowQueueTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_QUEUE_TABLE));
scanner = flowQueueTable.getScanner(scan);
int cnt = 0;
for (Result r : scanner) {
Flow flow = createFlowFromResult(r);
if (flow != null) {
cnt++;
results.add(flow);
}
if (cnt >= limit) {
break;
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (flowQueueTable != null) {
flowQueueTable.close();
}
}
}
return results;
}
/**
* scans the app version table to look for jobs that showed up in the given
* time range creates the flow key that maps to these apps
* @param cluster
* @param user
* @param startTime
* @param endTime
* @param limit
* @return list of flow keys
* @throws IOException
* @throws ProcessingException
*/
public List<AppSummary> getNewApps(JobHistoryService jhs, String cluster,
String user, long startTime, long endTime, int limit) throws IOException {
byte[] startRow = null;
if (StringUtils.isNotBlank(user)) {
startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
Bytes.toBytes(user));
} else {
startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster));
}
LOG.info(
"Reading app version rows start at " + Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning app version table at cluster!user!
scan.setStartRow(startRow);
// require that all results match this flow prefix
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
scan.setFilter(filters);
List<AppKey> newAppsKeys = new ArrayList<AppKey>();
try {
newAppsKeys =
createNewAppKeysFromResults(scan, startTime, endTime, limit);
} catch (IOException e) {
LOG.error(
"Caught exception while trying to scan, returning empty list of flows: "
+ e.toString());
}
List<AppSummary> newApps = new ArrayList<AppSummary>();
for (AppKey ak : newAppsKeys) {
AppSummary anApp = new AppSummary(ak);
List<Flow> flows =
jhs.getFlowSeries(ak.getCluster(), ak.getUserName(), ak.getAppId(),
null, Boolean.FALSE, startTime, endTime, Integer.MAX_VALUE);
for (Flow f : flows) {
anApp.addFlow(f);
}
newApps.add(anApp);
}
return newApps;
}