下面列出了怎么用org.apache.hadoop.hbase.filter.WhileMatchFilter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Returns the {@link Flow} instance matching the application ID and run ID.
*
* @param cluster the cluster identifier
* @param user the user running the jobs
* @param appId the application description
* @param runId the specific run ID for the flow
* @param populateTasks whether or not to populate the task details for each
* job
* @return
*/
public Flow getFlow(String cluster, String user, String appId, long runId,
boolean populateTasks) throws IOException {
Flow flow = null;
byte[] startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
Bytes.toBytes(user), Bytes.toBytes(appId),
Bytes.toBytes(FlowKey.encodeRunId(runId)), Constants.EMPTY_BYTES);
LOG.info(
"Reading job_history rows start at " + Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning history at cluster!user!app!run!
scan.setStartRow(startRow);
// require that all results match this flow prefix
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
List<Flow> flows = createFromResults(scan, populateTasks, 1);
if (flows.size() > 0) {
flow = flows.get(0);
}
return flow;
}
/**
* Returns the {@link Flow} instance containing the given job ID.
*
* @param cluster the cluster identifier
* @param jobId the job identifier
* @return
*/
public Flow getFlowByJobID(String cluster, String jobId,
boolean populateTasks) throws IOException {
Flow flow = null;
JobKey key = idService.getJobKeyById(new QualifiedJobId(cluster, jobId));
if (key != null) {
byte[] startRow =
ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(key.getCluster()),
Bytes.toBytes(key.getUserName()), Bytes.toBytes(key.getAppId()),
Bytes.toBytes(key.getEncodedRunId()), Constants.EMPTY_BYTES);
LOG.info("Reading job_history rows start at "
+ Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning history at cluster!user!app!run!
scan.setStartRow(startRow);
// require that all results match this flow prefix
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
List<Flow> flows = createFromResults(scan, populateTasks, 1);
if (flows.size() > 0) {
flow = flows.get(0);
}
}
return flow;
}
/**
* creates a scan for flow data
* @param rowPrefix - start row prefix
* @param limit - limit on scanned results
* @param version - version to match
* @return Scan
*/
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
Scan scan = new Scan();
scan.setStartRow(rowPrefix);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(prefixFilter);
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
scan.setFilter(filters);
} else {
scan.setFilter(prefixFilter);
}
return scan;
}
@Override
public int scannerOpenWithPrefix(ByteBuffer tableName,
ByteBuffer startAndPrefix,
List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException {
Table table = null;
try {
table = getTable(tableName);
Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
addAttributes(scan, attributes);
Filter f = new WhileMatchFilter(
new PrefixFilter(getBytes(startAndPrefix)));
scan.setFilter(f);
if (columns != null && !columns.isEmpty()) {
for(ByteBuffer column : columns) {
byte [][] famQf = CellUtil.parseColumn(getBytes(column));
if(famQf.length == 1) {
scan.addFamily(famQf[0]);
} else {
scan.addColumn(famQf[0], famQf[1]);
}
}
}
return addScanner(table.getScanner(scan), false);
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw getIOError(e);
} finally{
closeTable(table);
}
}
@Override
boolean testRow(final int i) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
.setScanMetricsEnabled(true);
FilterList list = new FilterList();
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
if (opts.addColumns) {
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
scan.addColumn(familyName, qualifier);
}
} else {
scan.addFamily(familyName);
}
}
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
}
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
scan.setFilter(list);
ResultScanner s = this.table.getScanner(scan);
try {
for (Result rr; (rr = s.next()) != null;) {
updateValueSize(rr);
}
} finally {
updateScanMetrics(s.getScanMetrics());
s.close();
}
return true;
}
@Override
void testRow(final int i) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, this.totalRows));
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
ResultScanner s = this.table.getScanner(scan);
s.close();
}
/**
* Returns a Scan instance to retrieve all the task rows for a given job from
* the job_history_task table.
* @param jobKey the job key to match for all task rows
* @return a {@code Scan} instance for the job_history_task table
*/
private Scan getTaskScan(JobKey jobKey) {
byte[] startKey =
Bytes.add(jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
Scan scan = new Scan();
scan.setStartRow(startKey);
// only return tasks for this job
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
// expect a lot of tasks on average
scan.setCaching(500);
return scan;
}
/**
* Retrieves all the event rows matching a single
* {@link com.twitter.hraven.Flow}.
* @param flowKey
* @return
*/
public List<FlowEvent> getFlowEvents(FlowKey flowKey) throws IOException {
byte[] startKey =
Bytes.add(flowKeyConverter.toBytes(flowKey), Constants.SEP_BYTES);
Scan scan = new Scan(startKey);
scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
List<FlowEvent> results = new ArrayList<FlowEvent>();
ResultScanner scanner = null;
Table eventTable = null;
try {
eventTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
scanner = eventTable.getScanner(scan);
for (Result r : scanner) {
FlowEvent event = createEventFromResult(r);
if (event != null) {
results.add(event);
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (eventTable != null) {
eventTable.close();
}
}
}
return results;
}
/**
* Retrieves all events added after the given event key (with sequence numbers
* greater than the given key). If no new events are found returns an empty
* list.
* @param lastSeen
* @return
*/
public List<FlowEvent> getFlowEventsSince(FlowEventKey lastSeen)
throws IOException {
// rows must match the FlowKey portion + SEP
byte[] keyPrefix =
Bytes.add(flowKeyConverter.toBytes(lastSeen), Constants.SEP_BYTES);
// start at the next following sequence number
FlowEventKey nextEvent = new FlowEventKey(lastSeen.getCluster(),
lastSeen.getUserName(), lastSeen.getAppId(), lastSeen.getRunId(),
lastSeen.getSequence() + 1);
byte[] startKey = keyConverter.toBytes(nextEvent);
Scan scan = new Scan(startKey);
scan.setFilter(new WhileMatchFilter(new PrefixFilter(keyPrefix)));
List<FlowEvent> results = new ArrayList<FlowEvent>();
ResultScanner scanner = null;
Table eventTable = null;
try {
eventTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
scanner = eventTable.getScanner(scan);
for (Result r : scanner) {
FlowEvent event = createEventFromResult(r);
if (event != null) {
results.add(event);
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (eventTable != null) {
eventTable.close();
}
}
}
return results;
}
/**
* Gets hdfs stats about all dirs on the given cluster
* @param cluster
* @param pathPrefix
* @param limit
* @param runId
* @return list of hdfs stats
* @throws IOException
*/
public List<HdfsStats> getAllDirs(String cluster, String pathPrefix,
int limit, long runId) throws IOException {
long encodedRunId = getEncodedRunId(runId);
String rowPrefixStr =
Long.toString(encodedRunId) + HdfsConstants.SEP + cluster;
if (StringUtils.isNotEmpty(pathPrefix)) {
// path expected to be cleansed at collection/storage time as well
rowPrefixStr += HdfsConstants.SEP + StringUtil.cleanseToken(pathPrefix);
}
LOG.info(" Getting all dirs for cluster " + cluster + " with pathPrefix: "
+ pathPrefix + " for runId " + runId + " encodedRunId: " + encodedRunId
+ " limit: " + limit + " row prefix : " + rowPrefixStr);
byte[] rowPrefix = Bytes.toBytes(rowPrefixStr);
Scan scan = createScanWithAllColumns();
scan.setStartRow(rowPrefix);
// require that all rows match the prefix we're looking for
Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
scan.setFilter(prefixFilter);
// using a large scanner caching value with a small limit can mean we scan a
// lot more data than
// necessary, so lower the caching for low limits
scan.setCaching(Math.min(limit, defaultScannerCaching));
// we need only the latest cell version
scan.setMaxVersions(1);
return createFromScanResults(cluster, null, scan, limit, Boolean.FALSE, 0l,
0l);
}
@Test(timeOut = 10_000)
public void testGet(ITestContext context) throws Exception {
try {
TransactionManager tm = newTransactionManager(context);
TTable table = new TTable(connection, TEST_TABLE);
Transaction t = tm.begin();
int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
for (int i = 0; i < lInts.length; i++) {
byte[] data = Bytes.toBytes(lInts[i]);
Put put = new Put(data);
put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
table.put(t, put);
}
int startKeyValue = lInts[3];
int stopKeyValue = lInts[3];
byte[] startKey = Bytes.toBytes(startKeyValue);
byte[] stopKey = Bytes.toBytes(stopKeyValue);
Get g = new Get(startKey);
Result r = table.get(t, g);
if (!r.isEmpty()) {
int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
LOG.info("Result:" + tmp);
assertTrue(tmp == startKeyValue, "Bad value, should be " + startKeyValue + " but is " + tmp);
} else {
Assert.fail("Bad result");
}
tm.commit(t);
Scan s = new Scan(startKey);
CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
boolean startInclusive = true;
if (!startInclusive) {
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryPrefixComparator(startKey)));
filters.addFilter(new WhileMatchFilter(toFilter));
s.setFilter(filters);
} else {
s.setFilter(new WhileMatchFilter(toFilter));
}
t = tm.begin();
ResultScanner res = table.getScanner(t, s);
Result rr;
int count = 0;
while ((rr = res.next()) != null) {
int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
LOG.info("Result: " + iTmp);
count++;
}
assertEquals(count, 1, "Count is wrong");
LOG.info("Rows found " + count);
tm.commit(t);
table.close();
} catch (Exception e) {
LOG.error("Exception in test", e);
}
}
private void initScan() throws IOException{
scan = new Scan();
scan.setCacheBlocks(cacheBlocks_);
scan.setCaching(caching_);
// Set filters, if any.
if (configuredOptions_.hasOption("gt")) {
gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
addRowFilter(CompareOp.GREATER, gt_);
scan.setStartRow(gt_);
}
if (configuredOptions_.hasOption("lt")) {
lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
addRowFilter(CompareOp.LESS, lt_);
scan.setStopRow(lt_);
}
if (configuredOptions_.hasOption("gte")) {
gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
scan.setStartRow(gte_);
}
if (configuredOptions_.hasOption("lte")) {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
byte[] lt = increment(lte_);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
Bytes.toString(lte_), toString(lte_), toString(lt)));
}
if (lt != null) {
scan.setStopRow(increment(lte_));
}
// The WhileMatchFilter will short-circuit the scan after we no longer match. The
// setStopRow call will limit the number of regions we need to scan
addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
}
if (configuredOptions_.hasOption("regex")) {
regex_ = Utils.slashisize(configuredOptions_.getOptionValue("regex"));
addFilter(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regex_)));
}
if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
scan.setTimeRange(minTimestamp_, maxTimestamp_);
}
if (configuredOptions_.hasOption("timestamp")){
scan.setTimeStamp(timestamp_);
}
// if the group of columnInfos for this family doesn't contain a prefix, we don't need
// to set any filters, we can just call addColumn or addFamily. See javadocs below.
boolean columnPrefixExists = false;
for (ColumnInfo columnInfo : columnInfo_) {
if (columnInfo.getColumnPrefix() != null) {
columnPrefixExists = true;
break;
}
}
if (!columnPrefixExists) {
addFiltersWithoutColumnPrefix(columnInfo_);
}
else {
addFiltersWithColumnPrefix(columnInfo_);
}
}
/**
* Returns the {@link Flow} runs' stats - summed up per flow If the
* {@code version} parameter is non-null, the returned results will be
* restricted to those matching this app version.
*
* <p>
* <strong>Note:</strong> this retrieval method will omit the configuration
* data from all of the returned jobs.
* </p>
*
* @param cluster the cluster where the jobs were run
* @param user the user running the jobs
* @param appId the application identifier for the jobs
* @param version if non-null, only flows matching this application version
* will be returned
* @param startTime the start time for the flows to be looked at
* @param endTime the end time for the flows to be looked at
* @param limit the maximum number of flows to return
* @return
*/
public List<Flow> getFlowTimeSeriesStats(String cluster, String user,
String appId, String version, long startTime, long endTime, int limit,
byte[] startRow) throws IOException {
// app portion of row key
byte[] rowPrefix = Bytes.toBytes((cluster + Constants.SEP + user
+ Constants.SEP + appId + Constants.SEP));
byte[] scanStartRow;
if (startRow != null) {
scanStartRow = startRow;
} else {
if (endTime != 0) {
// use end time in start row, if present
long endRunId = FlowKey.encodeRunId(endTime);
scanStartRow =
Bytes.add(rowPrefix, Bytes.toBytes(endRunId), Constants.SEP_BYTES);
} else {
scanStartRow = rowPrefix;
}
}
// TODO: use RunMatchFilter to limit scan on the server side
Scan scan = new Scan();
scan.setStartRow(scanStartRow);
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (startTime != 0) {
// if limited by start time, early out as soon as we hit it
long startRunId = FlowKey.encodeRunId(startTime);
// zero byte at the end makes the startRunId inclusive
byte[] scanEndRow = Bytes.add(rowPrefix, Bytes.toBytes(startRunId),
Constants.ZERO_SINGLE_BYTE);
scan.setStopRow(scanEndRow);
} else {
// require that all rows match the app prefix we're looking for
filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
}
// if version is passed, restrict the rows returned to that version
if (version != null && version.length() > 0) {
filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(version)));
}
// filter out all config columns except the queue name
filters.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL,
new RegexStringComparator(
"^c\\!((?!" + Constants.HRAVEN_QUEUE + ").)*$")));
scan.setFilter(filters);
LOG.info("scan : \n " + scan.toJSON() + " \n");
return createFromResults(scan, false, limit);
}
/**
* Returns the flows currently listed in the given {@link Flow.Status}
* @param cluster The cluster where flows have run
* @param status The flows' status
* @param limit Return up to this many Flow instances
* @param user Filter flows returned to only this user (if present)
* @param startRow Start results at this key. Use this in combination with
* {@code limit} to support pagination through the results.
* @return a list of up to {@code limit} Flows
* @throws IOException in the case of an error retrieving the data
*/
public List<Flow> getFlowsForStatus(String cluster, Flow.Status status,
int limit, String user, byte[] startRow) throws IOException {
byte[] rowPrefix = ByteUtil.join(Constants.SEP_BYTES,
Bytes.toBytes(cluster), status.code(), Constants.EMPTY_BYTES);
if (startRow == null) {
startRow = rowPrefix;
}
Scan scan = new Scan(startRow);
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// early out when prefix ends
filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
if (user != null) {
SingleColumnValueFilter userFilter = new SingleColumnValueFilter(
Constants.INFO_FAM_BYTES, USER_NAME_COL_BYTES,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(user));
userFilter.setFilterIfMissing(true);
filters.addFilter(userFilter);
}
scan.setFilter(filters);
// TODO: need to constrain this by timerange as well to prevent unlimited
// scans
// get back the results in a single response
scan.setCaching(limit);
List<Flow> results = new ArrayList<Flow>(limit);
ResultScanner scanner = null;
Table flowQueueTable = null;
try {
flowQueueTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_QUEUE_TABLE));
scanner = flowQueueTable.getScanner(scan);
int cnt = 0;
for (Result r : scanner) {
Flow flow = createFlowFromResult(r);
if (flow != null) {
cnt++;
results.add(flow);
}
if (cnt >= limit) {
break;
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (flowQueueTable != null) {
flowQueueTable.close();
}
}
}
return results;
}
/**
* scans the app version table to look for jobs that showed up in the given
* time range creates the flow key that maps to these apps
* @param cluster
* @param user
* @param startTime
* @param endTime
* @param limit
* @return list of flow keys
* @throws IOException
* @throws ProcessingException
*/
public List<AppSummary> getNewApps(JobHistoryService jhs, String cluster,
String user, long startTime, long endTime, int limit) throws IOException {
byte[] startRow = null;
if (StringUtils.isNotBlank(user)) {
startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
Bytes.toBytes(user));
} else {
startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster));
}
LOG.info(
"Reading app version rows start at " + Bytes.toStringBinary(startRow));
Scan scan = new Scan();
// start scanning app version table at cluster!user!
scan.setStartRow(startRow);
// require that all results match this flow prefix
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(new WhileMatchFilter(new PrefixFilter(startRow)));
scan.setFilter(filters);
List<AppKey> newAppsKeys = new ArrayList<AppKey>();
try {
newAppsKeys =
createNewAppKeysFromResults(scan, startTime, endTime, limit);
} catch (IOException e) {
LOG.error(
"Caught exception while trying to scan, returning empty list of flows: "
+ e.toString());
}
List<AppSummary> newApps = new ArrayList<AppSummary>();
for (AppKey ak : newAppsKeys) {
AppSummary anApp = new AppSummary(ak);
List<Flow> flows =
jhs.getFlowSeries(ak.getCluster(), ak.getUserName(), ak.getAppId(),
null, Boolean.FALSE, startTime, endTime, Integer.MAX_VALUE);
for (Flow f : flows) {
anApp.addFlow(f);
}
newApps.add(anApp);
}
return newApps;
}