

源代码1 项目: BigDataArchitect   文件: HBaseDemo.java
 * 查询某个用户所有的主叫电话(type=1)
 *  某个用户
 *  type=1
public void getType() throws IOException {
    Scan scan = new Scan();
    FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("type"),CompareOperator.EQUAL,Bytes.toBytes("1"));
    PrefixFilter filter2 = new PrefixFilter(Bytes.toBytes("15883348450"));
    ResultScanner rss = table.getScanner(scan);
    for (Result rs:rss) {
源代码2 项目: xxhadoop   文件: HBaseTest.java
public void testScan() throws IOException {
	Connection connection = admin.getConnection();
	Table table = connection.getTable(TableName.valueOf("tbl_girls"));
	Scan scan = new Scan(Bytes.toBytes("0001"), Bytes.toBytes("0004"));
	// RowKeyFilter
	Filter filter = new PrefixFilter(Bytes.toBytes("000"));
	Filter filter2 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("000"));
	//BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(29));		
	Filter filter3 = new SingleColumnValueFilter(Bytes.toBytes("base_info"), Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes(29));
	ResultScanner resultScanner = table.getScanner(scan);
	for (Result result : resultScanner) {
		int value = Bytes.toInt(result.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("age")));
源代码3 项目: hgraphdb   文件: VertexIndexModel.java
private Scan getVertexIndexScanWithLimit(String label, boolean isUnique, String key, Object from, int limit, boolean reversed) {
    byte[] prefix = serializeForRead(label, isUnique, key, null);
    byte[] startRow = from != null
            ? serializeForRead(label, isUnique, key, from)
            : prefix;
    byte[] stopRow = HConstants.EMPTY_END_ROW;
    if (graph.configuration().getInstanceType() == HBaseGraphConfiguration.InstanceType.BIGTABLE) {
        if (reversed) {
            throw new UnsupportedOperationException("Reverse scans not supported by Bigtable");
        } else {
            // PrefixFilter in Bigtable does not automatically stop
            // See https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/1087
            stopRow = HBaseGraphUtils.incrementBytes(prefix);
    if (reversed) startRow = HBaseGraphUtils.incrementBytes(startRow);
    Scan scan = new Scan(startRow, stopRow);
    FilterList filterList = new FilterList();
    filterList.addFilter(new PrefixFilter(prefix));
    filterList.addFilter(new PageFilter(limit));
    return scan;
源代码4 项目: hbase   文件: TestThriftConnection.java
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
  createTable(thriftAdmin, tableName);
  try (Table table = connection.getTable(TableName.valueOf(tableName))){
    FilterList filterList = new FilterList();
    PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
    ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
        CompareOperator.EQUAL, VALUE_1);
    Scan scan = new Scan();
    ResultScanner scanner = table.getScanner(scan);
    Iterator<Result> iterator = scanner.iterator();
    int counter = 0;
    while (iterator.hasNext()) {
      Result result = iterator.next();
      counter += result.size();
    assertEquals(2, counter);
源代码5 项目: hbase   文件: FromClientSideBase.java
protected ResultScanner buildScanner(String keyPrefix, String value, Table ht)
  throws IOException {
  // OurFilterList allFilters = new OurFilterList();
  FilterList allFilters = new FilterList(/* FilterList.Operator.MUST_PASS_ALL */);
  allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
  SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes
    .toBytes("trans-tags"), Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes

  // allFilters.addFilter(new
  // RowExcludingSingleColumnValueFilter(Bytes.toBytes("trans-tags"),
  // Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value)));

  Scan scan = new Scan();

  return ht.getScanner(scan);
源代码6 项目: hraven   文件: JobHistoryService.java
 * Returns the {@link Flow} instance matching the application ID and run ID.
 * @param cluster the cluster identifier
 * @param user the user running the jobs
 * @param appId the application description
 * @param runId the specific run ID for the flow
 * @param populateTasks whether or not to populate the task details for each
 *          job
 * @return
public Flow getFlow(String cluster, String user, String appId, long runId,
    boolean populateTasks) throws IOException {
  Flow flow = null;

  byte[] startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
      Bytes.toBytes(user), Bytes.toBytes(appId),
      Bytes.toBytes(FlowKey.encodeRunId(runId)), Constants.EMPTY_BYTES);

      "Reading job_history rows start at " + Bytes.toStringBinary(startRow));
  Scan scan = new Scan();
  // start scanning history at cluster!user!app!run!
  // require that all results match this flow prefix
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));

  List<Flow> flows = createFromResults(scan, populateTasks, 1);
  if (flows.size() > 0) {
    flow = flows.get(0);

  return flow;
源代码7 项目: hraven   文件: JobHistoryService.java
 * Returns the {@link Flow} instance containing the given job ID.
 * @param cluster the cluster identifier
 * @param jobId the job identifier
 * @return
public Flow getFlowByJobID(String cluster, String jobId,
    boolean populateTasks) throws IOException {
  Flow flow = null;
  JobKey key = idService.getJobKeyById(new QualifiedJobId(cluster, jobId));
  if (key != null) {
    byte[] startRow =
        ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(key.getCluster()),
            Bytes.toBytes(key.getUserName()), Bytes.toBytes(key.getAppId()),
            Bytes.toBytes(key.getEncodedRunId()), Constants.EMPTY_BYTES);

    LOG.info("Reading job_history rows start at "
        + Bytes.toStringBinary(startRow));
    Scan scan = new Scan();
    // start scanning history at cluster!user!app!run!
    // require that all results match this flow prefix
    scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));

    List<Flow> flows = createFromResults(scan, populateTasks, 1);
    if (flows.size() > 0) {
      flow = flows.get(0);
  return flow;
源代码8 项目: hraven   文件: JobHistoryService.java
 * creates a scan for flow data
 * @param rowPrefix - start row prefix
 * @param limit - limit on scanned results
 * @param version - version to match
 * @return Scan
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
  Scan scan = new Scan();

  // using a large scanner caching value with a small limit can mean we scan a
  // lot more data than necessary, so lower the caching for low limits
  scan.setCaching(Math.min(limit, defaultScannerCaching));
  // require that all rows match the prefix we're looking for
  Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
  // if version is passed, restrict the rows returned to that version
  if (version != null && version.length() > 0) {
    FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
        Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
  } else {
  return scan;
源代码9 项目: zxl   文件: BaseDao.java
public final List<E> scanByRowPrefix(String prefix) {
	HTableInterface hTableInterface = getHTableInterface();
	try {
		Scan scan = new Scan();
		scan.setFilter(new PrefixFilter(Bytes.toBytes(prefix)));
		ResultScanner resultScanner = hTableInterface.getScanner(scan);
		return parse(resultScanner);
	} catch (Exception cause) {
		throw new RuntimeException(cause);
	} finally {
源代码10 项目: hbase   文件: ThriftHBaseServiceHandler.java
public int scannerOpenWithPrefix(ByteBuffer tableName,
    ByteBuffer startAndPrefix,
    List<ByteBuffer> columns,
    Map<ByteBuffer, ByteBuffer> attributes)
    throws IOError, TException {

  Table table = null;
  try {
    table = getTable(tableName);
    Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
    addAttributes(scan, attributes);
    Filter f = new WhileMatchFilter(
        new PrefixFilter(getBytes(startAndPrefix)));
    if (columns != null && !columns.isEmpty()) {
      for(ByteBuffer column : columns) {
        byte [][] famQf = CellUtil.parseColumn(getBytes(column));
        if(famQf.length == 1) {
        } else {
          scan.addColumn(famQf[0], famQf[1]);
    return addScanner(table.getScanner(scan), false);
  } catch (IOException e) {
    LOG.warn(e.getMessage(), e);
    throw getIOError(e);
  } finally{
源代码11 项目: hbase   文件: VerifyReplication.java
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
  if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
    String[] rowPrefixArray = rowPrefixes.split(",");
    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
    for (String prefix : rowPrefixArray) {
      Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
    byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
    byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
    setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
源代码12 项目: hbase   文件: TestScannersWithFilters.java
public void testPrefixFilter() throws Exception {
  // Grab rows from group one (half of total)
  long expectedRows = numRows / 2;
  long expectedKeys = colsPerRow;
  Scan s = new Scan();
  s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne")));
  verifyScan(s, expectedRows, expectedKeys);
源代码13 项目: searchanalytics-bigdata   文件: HbaseServiceImpl.java
public List<String> getAllSearchQueryStringsByCustomerInLastOneMonth(final Long customerId) {
	LOG.debug("Calling getAllSearchQueryStringsByCustomerInLastOneMonth for customerid: {}", customerId);
	Scan scan = new Scan();
	Filter filter = new PrefixFilter(Bytes.toBytes(customerId + "-"));
	DateTime dateTime = new DateTime();
	try {
		scan.setTimeRange(dateTime.minusDays(30).getMillis(), dateTime.getMillis());
	} catch (IOException e) {
		throw new RuntimeException(e);
	List<String> rows = hbaseTemplate.find("searchclicks", scan, new RowMapper<String>() {
		public String mapRow(Result result, int rowNum) throws Exception {
			LOG.debug("Row is: {}", new String(result.getRow()));
			byte[] value = result.getValue(
			String queryString = null;
			if (value != null) {
				queryString = new String(value);
				LOG.debug("Query String: {}",
						new Object[] { queryString });
			return queryString;
	List<String> list = new ArrayList<>();
	for (String string : rows) {
		if(string !=null )
	LOG.debug("Checking getAllSearchQueryStringsByCustomerInLastOneMonth done with list:{}", list);
	return list;
源代码14 项目: hraven   文件: JobHistoryService.java
 * Returns a Scan instance to retrieve all the task rows for a given job from
 * the job_history_task table.
 * @param jobKey the job key to match for all task rows
 * @return a {@code Scan} instance for the job_history_task table
private Scan getTaskScan(JobKey jobKey) {
  byte[] startKey =
      Bytes.add(jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
  Scan scan = new Scan();
  // only return tasks for this job
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
  // expect a lot of tasks on average
  return scan;
源代码15 项目: hraven   文件: FlowEventService.java
 * Retrieves all the event rows matching a single
 * {@link com.twitter.hraven.Flow}.
 * @param flowKey
 * @return
public List<FlowEvent> getFlowEvents(FlowKey flowKey) throws IOException {
  byte[] startKey =
      Bytes.add(flowKeyConverter.toBytes(flowKey), Constants.SEP_BYTES);
  Scan scan = new Scan(startKey);
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));

  List<FlowEvent> results = new ArrayList<FlowEvent>();
  ResultScanner scanner = null;
  Table eventTable = null;
  try {
    eventTable = hbaseConnection
    scanner = eventTable.getScanner(scan);
    for (Result r : scanner) {
      FlowEvent event = createEventFromResult(r);
      if (event != null) {
  } finally {
    try {
      if (scanner != null) {
    } finally {
      if (eventTable != null) {
  return results;
源代码16 项目: hraven   文件: FlowEventService.java
 * Retrieves all events added after the given event key (with sequence numbers
 * greater than the given key). If no new events are found returns an empty
 * list.
 * @param lastSeen
 * @return
public List<FlowEvent> getFlowEventsSince(FlowEventKey lastSeen)
    throws IOException {
  // rows must match the FlowKey portion + SEP
  byte[] keyPrefix =
      Bytes.add(flowKeyConverter.toBytes(lastSeen), Constants.SEP_BYTES);
  // start at the next following sequence number
  FlowEventKey nextEvent = new FlowEventKey(lastSeen.getCluster(),
      lastSeen.getUserName(), lastSeen.getAppId(), lastSeen.getRunId(),
      lastSeen.getSequence() + 1);
  byte[] startKey = keyConverter.toBytes(nextEvent);
  Scan scan = new Scan(startKey);
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(keyPrefix)));

  List<FlowEvent> results = new ArrayList<FlowEvent>();
  ResultScanner scanner = null;
  Table eventTable = null;
  try {
    eventTable = hbaseConnection
    scanner = eventTable.getScanner(scan);
    for (Result r : scanner) {
      FlowEvent event = createEventFromResult(r);
      if (event != null) {
  } finally {
    try {
      if (scanner != null) {
    } finally {
      if (eventTable != null) {
  return results;
源代码17 项目: hraven   文件: HdfsStatsService.java
 * Gets hdfs stats about all dirs on the given cluster
 * @param cluster
 * @param pathPrefix
 * @param limit
 * @param runId
 * @return list of hdfs stats
 * @throws IOException
public List<HdfsStats> getAllDirs(String cluster, String pathPrefix,
    int limit, long runId) throws IOException {
  long encodedRunId = getEncodedRunId(runId);
  String rowPrefixStr =
      Long.toString(encodedRunId) + HdfsConstants.SEP + cluster;
  if (StringUtils.isNotEmpty(pathPrefix)) {
    // path expected to be cleansed at collection/storage time as well
    rowPrefixStr += HdfsConstants.SEP + StringUtil.cleanseToken(pathPrefix);
  LOG.info(" Getting all dirs for cluster " + cluster + " with pathPrefix: "
      + pathPrefix + " for runId " + runId + " encodedRunId: " + encodedRunId
      + " limit: " + limit + " row prefix : " + rowPrefixStr);
  byte[] rowPrefix = Bytes.toBytes(rowPrefixStr);
  Scan scan = createScanWithAllColumns();

  // require that all rows match the prefix we're looking for
  Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
  // using a large scanner caching value with a small limit can mean we scan a
  // lot more data than
  // necessary, so lower the caching for low limits
  scan.setCaching(Math.min(limit, defaultScannerCaching));
  // we need only the latest cell version

  return createFromScanResults(cluster, null, scan, limit, Boolean.FALSE, 0l,

源代码18 项目: hbase   文件: MetaBrowser.java
private static Filter buildTableFilter(final TableName tableName) {
  return new PrefixFilter(tableName.toBytes());
源代码19 项目: hbase   文件: TestSerialization.java
public void testScan() throws Exception {

  byte[] startRow = Bytes.toBytes("startRow");
  byte[] stopRow = Bytes.toBytes("stopRow");
  byte[] fam = Bytes.toBytes("fam");
  byte[] qf1 = Bytes.toBytes("qf1");

  long ts = System.currentTimeMillis();
  int maxVersions = 2;

  Scan scan = new Scan().withStartRow(startRow).withStopRow(stopRow);
  scan.addColumn(fam, qf1);
  scan.setTimeRange(ts, ts + 1);

  ClientProtos.Scan scanProto = ProtobufUtil.toScan(scan);
  Scan desScan = ProtobufUtil.toScan(scanProto);

  assertTrue(Bytes.equals(scan.getStartRow(), desScan.getStartRow()));
  assertTrue(Bytes.equals(scan.getStopRow(), desScan.getStopRow()));
  assertEquals(scan.getCacheBlocks(), desScan.getCacheBlocks());
  Set<byte[]> set = null;
  Set<byte[]> desSet = null;

  for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
    set = entry.getValue();
    desSet = desScan.getFamilyMap().get(entry.getKey());
    for (byte[] column : set) {

    // Test filters are serialized properly.
    scan = new Scan().withStartRow(startRow);
    final String name = "testScan";
    byte[] prefix = Bytes.toBytes(name);
    scan.setFilter(new PrefixFilter(prefix));
    scanProto = ProtobufUtil.toScan(scan);
    desScan = ProtobufUtil.toScan(scanProto);
    Filter f = desScan.getFilter();
    assertTrue(f instanceof PrefixFilter);

  assertEquals(scan.getMaxVersions(), desScan.getMaxVersions());
  TimeRange tr = scan.getTimeRange();
  TimeRange desTr = desScan.getTimeRange();
  assertEquals(tr.getMax(), desTr.getMax());
  assertEquals(tr.getMin(), desTr.getMin());
源代码20 项目: hbase   文件: TestImportExport.java
 * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
 * attempt with invalid values.
public void testWithFilter() throws Throwable {
  // Create simple table to export
  TableDescriptor desc = TableDescriptorBuilder
  Table exportTable = UTIL.getConnection().getTable(desc.getTableName());

  Put p1 = new Put(ROW1);
  p1.addColumn(FAMILYA, QUAL, now, QUAL);
  p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
  p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
  p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
  p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);

  // Having another row would actually test the filter.
  Put p2 = new Put(ROW2);
  p2.addColumn(FAMILYA, QUAL, now, QUAL);

  exportTable.put(Arrays.asList(p1, p2));

  // Export the simple table
  String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };

  // Import to a new table
  final String IMPORT_TABLE = name.getMethodName() + "import";
  desc = TableDescriptorBuilder

  Table importTable = UTIL.getConnection().getTable(desc.getTableName());
  args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
      "1000" };

  // get the count of the source table for that time range
  PrefixFilter filter = new PrefixFilter(ROW1);
  int count = getCount(exportTable, filter);

  Assert.assertEquals("Unexpected row count between export and import tables", count,
    getCount(importTable, null));

  // and then test that a broken command doesn't bork everything - easier here because we don't
  // need to re-run the export job

  args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
      FQ_OUTPUT_DIR, "1000" };

  // cleanup
源代码21 项目: hbase   文件: TestImportExport.java
public void testExportScan() throws Exception {
  int version = 100;
  long startTime = System.currentTimeMillis();
  long endTime = startTime + 1;
  String prefix = "row";
  String label_0 = "label_0";
  String label_1 = "label_1";
  String[] args = {
  Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
  assertEquals(version, scan.getMaxVersions());
  assertEquals(startTime, scan.getTimeRange().getMin());
  assertEquals(endTime, scan.getTimeRange().getMax());
  assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
  assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
  String[] argsWithLabels = {
    "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
  Configuration conf = new Configuration(UTIL.getConfiguration());
  // parse the "-D" options
  String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
  Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
  assertEquals(version, scanWithLabels.getMaxVersions());
  assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
  assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
  assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
  assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
  assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
  assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
  assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
源代码22 项目: hraven   文件: JobHistoryService.java
 * Returns the {@link Flow} runs' stats - summed up per flow If the
 * {@code version} parameter is non-null, the returned results will be
 * restricted to those matching this app version.
 * <p>
 * <strong>Note:</strong> this retrieval method will omit the configuration
 * data from all of the returned jobs.
 * </p>
 * @param cluster the cluster where the jobs were run
 * @param user the user running the jobs
 * @param appId the application identifier for the jobs
 * @param version if non-null, only flows matching this application version
 *          will be returned
 * @param startTime the start time for the flows to be looked at
 * @param endTime the end time for the flows to be looked at
 * @param limit the maximum number of flows to return
 * @return
public List<Flow> getFlowTimeSeriesStats(String cluster, String user,
    String appId, String version, long startTime, long endTime, int limit,
    byte[] startRow) throws IOException {

  // app portion of row key
  byte[] rowPrefix = Bytes.toBytes((cluster + Constants.SEP + user
      + Constants.SEP + appId + Constants.SEP));
  byte[] scanStartRow;

  if (startRow != null) {
    scanStartRow = startRow;
  } else {
    if (endTime != 0) {
      // use end time in start row, if present
      long endRunId = FlowKey.encodeRunId(endTime);
      scanStartRow =
          Bytes.add(rowPrefix, Bytes.toBytes(endRunId), Constants.SEP_BYTES);
    } else {
      scanStartRow = rowPrefix;

  // TODO: use RunMatchFilter to limit scan on the server side
  Scan scan = new Scan();
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);

  if (startTime != 0) {
    // if limited by start time, early out as soon as we hit it
    long startRunId = FlowKey.encodeRunId(startTime);
    // zero byte at the end makes the startRunId inclusive
    byte[] scanEndRow = Bytes.add(rowPrefix, Bytes.toBytes(startRunId),
  } else {
    // require that all rows match the app prefix we're looking for
    filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));

  // if version is passed, restrict the rows returned to that version
  if (version != null && version.length() > 0) {
    filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
        Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,

  // filter out all config columns except the queue name
  filters.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new RegexStringComparator(
          "^c\\!((?!" + Constants.HRAVEN_QUEUE + ").)*$")));


  LOG.info("scan : \n " + scan.toJSON() + " \n");
  return createFromResults(scan, false, limit);
源代码23 项目: hraven   文件: FlowQueueService.java
 * Returns the flows currently listed in the given {@link Flow.Status}
 * @param cluster The cluster where flows have run
 * @param status The flows' status
 * @param limit Return up to this many Flow instances
 * @param user Filter flows returned to only this user (if present)
 * @param startRow Start results at this key. Use this in combination with
 *          {@code limit} to support pagination through the results.
 * @return a list of up to {@code limit} Flows
 * @throws IOException in the case of an error retrieving the data
public List<Flow> getFlowsForStatus(String cluster, Flow.Status status,
    int limit, String user, byte[] startRow) throws IOException {
  byte[] rowPrefix = ByteUtil.join(Constants.SEP_BYTES,
      Bytes.toBytes(cluster), status.code(), Constants.EMPTY_BYTES);
  if (startRow == null) {
    startRow = rowPrefix;
  Scan scan = new Scan(startRow);
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  // early out when prefix ends
  filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
  if (user != null) {
    SingleColumnValueFilter userFilter = new SingleColumnValueFilter(
        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(user));
  // TODO: need to constrain this by timerange as well to prevent unlimited
  // scans

  // get back the results in a single response
  List<Flow> results = new ArrayList<Flow>(limit);
  ResultScanner scanner = null;
  Table flowQueueTable = null;
  try {
    flowQueueTable = hbaseConnection
    scanner = flowQueueTable.getScanner(scan);
    int cnt = 0;
    for (Result r : scanner) {
      Flow flow = createFlowFromResult(r);
      if (flow != null) {
      if (cnt >= limit) {
  } finally {
    try {
    if (scanner != null) {
    } finally {
      if (flowQueueTable != null) {
  return results;
源代码24 项目: hraven   文件: AppSummaryService.java
 * scans the app version table to look for jobs that showed up in the given
 * time range creates the flow key that maps to these apps
 * @param cluster
 * @param user
 * @param startTime
 * @param endTime
 * @param limit
 * @return list of flow keys
 * @throws IOException
 * @throws ProcessingException
public List<AppSummary> getNewApps(JobHistoryService jhs, String cluster,
    String user, long startTime, long endTime, int limit) throws IOException {
  byte[] startRow = null;
  if (StringUtils.isNotBlank(user)) {
    startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
  } else {
    startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster));
      "Reading app version rows start at " + Bytes.toStringBinary(startRow));
  Scan scan = new Scan();
  // start scanning app version table at cluster!user!
  // require that all results match this flow prefix
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  filters.addFilter(new WhileMatchFilter(new PrefixFilter(startRow)));


  List<AppKey> newAppsKeys = new ArrayList<AppKey>();
  try {
    newAppsKeys =
        createNewAppKeysFromResults(scan, startTime, endTime, limit);
  } catch (IOException e) {
        "Caught exception while trying to scan, returning empty list of flows: "
            + e.toString());

  List<AppSummary> newApps = new ArrayList<AppSummary>();
  for (AppKey ak : newAppsKeys) {
    AppSummary anApp = new AppSummary(ak);
    List<Flow> flows =
        jhs.getFlowSeries(ak.getCluster(), ak.getUserName(), ak.getAppId(),
            null, Boolean.FALSE, startTime, endTime, Integer.MAX_VALUE);
    for (Flow f : flows) {
  return newApps;
