类org.apache.hadoop.hbase.filter.WhileMatchFilter源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.filter.WhileMatchFilter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hraven   文件: JobHistoryService.java
/**
 * Returns the {@link Flow} instance matching the application ID and run ID.
 *
 * @param cluster the cluster identifier
 * @param user the user running the jobs
 * @param appId the application description
 * @param runId the specific run ID for the flow
 * @param populateTasks whether or not to populate the task details for each
 *          job
 * @return
 */
public Flow getFlow(String cluster, String user, String appId, long runId,
    boolean populateTasks) throws IOException {
  Flow flow = null;

  byte[] startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
      Bytes.toBytes(user), Bytes.toBytes(appId),
      Bytes.toBytes(FlowKey.encodeRunId(runId)), Constants.EMPTY_BYTES);

  LOG.info(
      "Reading job_history rows start at " + Bytes.toStringBinary(startRow));
  Scan scan = new Scan();
  // start scanning history at cluster!user!app!run!
  scan.setStartRow(startRow);
  // require that all results match this flow prefix
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));

  List<Flow> flows = createFromResults(scan, populateTasks, 1);
  if (flows.size() > 0) {
    flow = flows.get(0);
  }

  return flow;
}
 
源代码2 项目: hraven   文件: JobHistoryService.java
/**
 * Returns the {@link Flow} instance containing the given job ID.
 *
 * @param cluster the cluster identifier
 * @param jobId the job identifier
 * @return
 */
public Flow getFlowByJobID(String cluster, String jobId,
    boolean populateTasks) throws IOException {
  Flow flow = null;
  JobKey key = idService.getJobKeyById(new QualifiedJobId(cluster, jobId));
  if (key != null) {
    byte[] startRow =
        ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(key.getCluster()),
            Bytes.toBytes(key.getUserName()), Bytes.toBytes(key.getAppId()),
            Bytes.toBytes(key.getEncodedRunId()), Constants.EMPTY_BYTES);

    LOG.info("Reading job_history rows start at "
        + Bytes.toStringBinary(startRow));
    Scan scan = new Scan();
    // start scanning history at cluster!user!app!run!
    scan.setStartRow(startRow);
    // require that all results match this flow prefix
    scan.setFilter(new WhileMatchFilter(new PrefixFilter(startRow)));

    List<Flow> flows = createFromResults(scan, populateTasks, 1);
    if (flows.size() > 0) {
      flow = flows.get(0);
    }
  }
  return flow;
}
 
源代码3 项目: hraven   文件: JobHistoryService.java
/**
 * creates a scan for flow data
 * @param rowPrefix - start row prefix
 * @param limit - limit on scanned results
 * @param version - version to match
 * @return Scan
 */
private Scan createFlowScan(byte[] rowPrefix, int limit, String version) {
  Scan scan = new Scan();
  scan.setStartRow(rowPrefix);

  // using a large scanner caching value with a small limit can mean we scan a
  // lot more data than necessary, so lower the caching for low limits
  scan.setCaching(Math.min(limit, defaultScannerCaching));
  // require that all rows match the prefix we're looking for
  Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
  // if version is passed, restrict the rows returned to that version
  if (version != null && version.length() > 0) {
    FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    filters.addFilter(prefixFilter);
    filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
        Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
        Bytes.toBytes(version)));
    scan.setFilter(filters);
  } else {
    scan.setFilter(prefixFilter);
  }
  return scan;
}
 
源代码4 项目: hbase   文件: ThriftHBaseServiceHandler.java
@Override
public int scannerOpenWithPrefix(ByteBuffer tableName,
    ByteBuffer startAndPrefix,
    List<ByteBuffer> columns,
    Map<ByteBuffer, ByteBuffer> attributes)
    throws IOError, TException {

  Table table = null;
  try {
    table = getTable(tableName);
    Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
    addAttributes(scan, attributes);
    Filter f = new WhileMatchFilter(
        new PrefixFilter(getBytes(startAndPrefix)));
    scan.setFilter(f);
    if (columns != null && !columns.isEmpty()) {
      for(ByteBuffer column : columns) {
        byte [][] famQf = CellUtil.parseColumn(getBytes(column));
        if(famQf.length == 1) {
          scan.addFamily(famQf[0]);
        } else {
          scan.addColumn(famQf[0], famQf[1]);
        }
      }
    }
    return addScanner(table.getScanner(scan), false);
  } catch (IOException e) {
    LOG.warn(e.getMessage(), e);
    throw getIOError(e);
  } finally{
    closeTable(table);
  }
}
 
源代码5 项目: hbase   文件: PerformanceEvaluation.java
@Override
boolean testRow(final int i) throws IOException {
  Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
      .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
      .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
      .setScanMetricsEnabled(true);
  FilterList list = new FilterList();
  for (int family = 0; family < opts.families; family++) {
    byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
    if (opts.addColumns) {
      for (int column = 0; column < opts.columns; column++) {
        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
        scan.addColumn(familyName, qualifier);
      }
    } else {
      scan.addFamily(familyName);
    }
  }
  if (opts.filterAll) {
    list.addFilter(new FilterAllFilter());
  }
  list.addFilter(new WhileMatchFilter(new PageFilter(120)));
  scan.setFilter(list);
  ResultScanner s = this.table.getScanner(scan);
  try {
    for (Result rr; (rr = s.next()) != null;) {
      updateValueSize(rr);
    }
  } finally {
    updateScanMetrics(s.getScanMetrics());
    s.close();
  }
  return true;
}
 
源代码6 项目: hbase   文件: PerformanceEvaluation.java
@Override
void testRow(final int i) throws IOException {
  Scan scan = new Scan().withStartRow(getRandomRow(this.rand, this.totalRows));
  scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
  ResultScanner s = this.table.getScanner(scan);
  s.close();
}
 
源代码7 项目: hraven   文件: JobHistoryService.java
/**
 * Returns a Scan instance to retrieve all the task rows for a given job from
 * the job_history_task table.
 * @param jobKey the job key to match for all task rows
 * @return a {@code Scan} instance for the job_history_task table
 */
private Scan getTaskScan(JobKey jobKey) {
  byte[] startKey =
      Bytes.add(jobKeyConv.toBytes(jobKey), Constants.SEP_BYTES);
  Scan scan = new Scan();
  scan.setStartRow(startKey);
  // only return tasks for this job
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));
  // expect a lot of tasks on average
  scan.setCaching(500);
  return scan;
}
 
源代码8 项目: hraven   文件: FlowEventService.java
/**
 * Retrieves all the event rows matching a single
 * {@link com.twitter.hraven.Flow}.
 * @param flowKey
 * @return
 */
public List<FlowEvent> getFlowEvents(FlowKey flowKey) throws IOException {
  byte[] startKey =
      Bytes.add(flowKeyConverter.toBytes(flowKey), Constants.SEP_BYTES);
  Scan scan = new Scan(startKey);
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(startKey)));

  List<FlowEvent> results = new ArrayList<FlowEvent>();
  ResultScanner scanner = null;
  Table eventTable = null;
  try {
    eventTable = hbaseConnection
        .getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
    scanner = eventTable.getScanner(scan);
    for (Result r : scanner) {
      FlowEvent event = createEventFromResult(r);
      if (event != null) {
        results.add(event);
      }
    }
  } finally {
    try {
      if (scanner != null) {
        scanner.close();
      }
    } finally {
      if (eventTable != null) {
        eventTable.close();
      }
    }
  }
  return results;
}
 
源代码9 项目: hraven   文件: FlowEventService.java
/**
 * Retrieves all events added after the given event key (with sequence numbers
 * greater than the given key). If no new events are found returns an empty
 * list.
 * @param lastSeen
 * @return
 */
public List<FlowEvent> getFlowEventsSince(FlowEventKey lastSeen)
    throws IOException {
  // rows must match the FlowKey portion + SEP
  byte[] keyPrefix =
      Bytes.add(flowKeyConverter.toBytes(lastSeen), Constants.SEP_BYTES);
  // start at the next following sequence number
  FlowEventKey nextEvent = new FlowEventKey(lastSeen.getCluster(),
      lastSeen.getUserName(), lastSeen.getAppId(), lastSeen.getRunId(),
      lastSeen.getSequence() + 1);
  byte[] startKey = keyConverter.toBytes(nextEvent);
  Scan scan = new Scan(startKey);
  scan.setFilter(new WhileMatchFilter(new PrefixFilter(keyPrefix)));

  List<FlowEvent> results = new ArrayList<FlowEvent>();
  ResultScanner scanner = null;
  Table eventTable = null;
  try {
    eventTable = hbaseConnection
        .getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
    scanner = eventTable.getScanner(scan);
    for (Result r : scanner) {
      FlowEvent event = createEventFromResult(r);
      if (event != null) {
        results.add(event);
      }
    }
  } finally {
    try {
      if (scanner != null) {
        scanner.close();
      }
    } finally {
      if (eventTable != null) {
        eventTable.close();
      }
    }
  }
  return results;
}
 
源代码10 项目: hraven   文件: HdfsStatsService.java
/**
 * Gets hdfs stats about all dirs on the given cluster
 * @param cluster
 * @param pathPrefix
 * @param limit
 * @param runId
 * @return list of hdfs stats
 * @throws IOException
 */
public List<HdfsStats> getAllDirs(String cluster, String pathPrefix,
    int limit, long runId) throws IOException {
  long encodedRunId = getEncodedRunId(runId);
  String rowPrefixStr =
      Long.toString(encodedRunId) + HdfsConstants.SEP + cluster;
  if (StringUtils.isNotEmpty(pathPrefix)) {
    // path expected to be cleansed at collection/storage time as well
    rowPrefixStr += HdfsConstants.SEP + StringUtil.cleanseToken(pathPrefix);
  }
  LOG.info(" Getting all dirs for cluster " + cluster + " with pathPrefix: "
      + pathPrefix + " for runId " + runId + " encodedRunId: " + encodedRunId
      + " limit: " + limit + " row prefix : " + rowPrefixStr);
  byte[] rowPrefix = Bytes.toBytes(rowPrefixStr);
  Scan scan = createScanWithAllColumns();
  scan.setStartRow(rowPrefix);

  // require that all rows match the prefix we're looking for
  Filter prefixFilter = new WhileMatchFilter(new PrefixFilter(rowPrefix));
  scan.setFilter(prefixFilter);
  // using a large scanner caching value with a small limit can mean we scan a
  // lot more data than
  // necessary, so lower the caching for low limits
  scan.setCaching(Math.min(limit, defaultScannerCaching));
  // we need only the latest cell version
  scan.setMaxVersions(1);

  return createFromScanResults(cluster, null, scan, limit, Boolean.FALSE, 0l,
      0l);

}
 
源代码11 项目: phoenix-omid   文件: TestUpdateScan.java
@Test(timeOut = 10_000)
public void testGet(ITestContext context) throws Exception {
    try {
        TransactionManager tm = newTransactionManager(context);
        TTable table = new TTable(connection, TEST_TABLE);
        Transaction t = tm.begin();
        int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
        for (int i = 0; i < lInts.length; i++) {
            byte[] data = Bytes.toBytes(lInts[i]);
            Put put = new Put(data);
            put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
            table.put(t, put);
        }
        int startKeyValue = lInts[3];
        int stopKeyValue = lInts[3];
        byte[] startKey = Bytes.toBytes(startKeyValue);
        byte[] stopKey = Bytes.toBytes(stopKeyValue);
        Get g = new Get(startKey);
        Result r = table.get(t, g);
        if (!r.isEmpty()) {
            int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
            LOG.info("Result:" + tmp);
            assertTrue(tmp == startKeyValue, "Bad value, should be " + startKeyValue + " but is " + tmp);
        } else {
            Assert.fail("Bad result");
        }
        tm.commit(t);

        Scan s = new Scan(startKey);
        CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
        RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
        boolean startInclusive = true;
        if (!startInclusive) {
            FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryPrefixComparator(startKey)));
            filters.addFilter(new WhileMatchFilter(toFilter));
            s.setFilter(filters);
        } else {
            s.setFilter(new WhileMatchFilter(toFilter));
        }
        t = tm.begin();
        ResultScanner res = table.getScanner(t, s);
        Result rr;
        int count = 0;
        while ((rr = res.next()) != null) {
            int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL)));
            LOG.info("Result: " + iTmp);
            count++;
        }
        assertEquals(count, 1, "Count is wrong");
        LOG.info("Rows found " + count);
        tm.commit(t);
        table.close();
    } catch (Exception e) {
        LOG.error("Exception in test", e);
    }
}
 
源代码12 项目: spork   文件: HBaseStorage.java
private void initScan() throws IOException{
    scan = new Scan();

    scan.setCacheBlocks(cacheBlocks_);
    scan.setCaching(caching_);

    // Set filters, if any.
    if (configuredOptions_.hasOption("gt")) {
        gt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gt")));
        addRowFilter(CompareOp.GREATER, gt_);
        scan.setStartRow(gt_);
    }
    if (configuredOptions_.hasOption("lt")) {
        lt_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lt")));
        addRowFilter(CompareOp.LESS, lt_);
        scan.setStopRow(lt_);
    }
    if (configuredOptions_.hasOption("gte")) {
        gte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("gte")));
        scan.setStartRow(gte_);
    }
    if (configuredOptions_.hasOption("lte")) {
        lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
        byte[] lt = increment(lte_);
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
                      Bytes.toString(lte_), toString(lte_), toString(lt)));
        }

        if (lt != null) {
            scan.setStopRow(increment(lte_));
        }

        // The WhileMatchFilter will short-circuit the scan after we no longer match. The
        // setStopRow call will limit the number of regions we need to scan
        addFilter(new WhileMatchFilter(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(lte_))));
    }
    if (configuredOptions_.hasOption("regex")) {
        regex_ = Utils.slashisize(configuredOptions_.getOptionValue("regex"));
        addFilter(new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regex_)));
    }
    if (configuredOptions_.hasOption("minTimestamp") || configuredOptions_.hasOption("maxTimestamp")){
        scan.setTimeRange(minTimestamp_, maxTimestamp_);
    }
    if (configuredOptions_.hasOption("timestamp")){
        scan.setTimeStamp(timestamp_);
    }

    // if the group of columnInfos for this family doesn't contain a prefix, we don't need
    // to set any filters, we can just call addColumn or addFamily. See javadocs below.
    boolean columnPrefixExists = false;
    for (ColumnInfo columnInfo : columnInfo_) {
        if (columnInfo.getColumnPrefix() != null) {
            columnPrefixExists = true;
            break;
        }
    }

    if (!columnPrefixExists) {
        addFiltersWithoutColumnPrefix(columnInfo_);
    }
    else {
        addFiltersWithColumnPrefix(columnInfo_);
    }
}
 
源代码13 项目: hraven   文件: JobHistoryService.java
/**
 * Returns the {@link Flow} runs' stats - summed up per flow If the
 * {@code version} parameter is non-null, the returned results will be
 * restricted to those matching this app version.
 *
 * <p>
 * <strong>Note:</strong> this retrieval method will omit the configuration
 * data from all of the returned jobs.
 * </p>
 *
 * @param cluster the cluster where the jobs were run
 * @param user the user running the jobs
 * @param appId the application identifier for the jobs
 * @param version if non-null, only flows matching this application version
 *          will be returned
 * @param startTime the start time for the flows to be looked at
 * @param endTime the end time for the flows to be looked at
 * @param limit the maximum number of flows to return
 * @return
 */
public List<Flow> getFlowTimeSeriesStats(String cluster, String user,
    String appId, String version, long startTime, long endTime, int limit,
    byte[] startRow) throws IOException {

  // app portion of row key
  byte[] rowPrefix = Bytes.toBytes((cluster + Constants.SEP + user
      + Constants.SEP + appId + Constants.SEP));
  byte[] scanStartRow;

  if (startRow != null) {
    scanStartRow = startRow;
  } else {
    if (endTime != 0) {
      // use end time in start row, if present
      long endRunId = FlowKey.encodeRunId(endTime);
      scanStartRow =
          Bytes.add(rowPrefix, Bytes.toBytes(endRunId), Constants.SEP_BYTES);
    } else {
      scanStartRow = rowPrefix;
    }
  }

  // TODO: use RunMatchFilter to limit scan on the server side
  Scan scan = new Scan();
  scan.setStartRow(scanStartRow);
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);

  if (startTime != 0) {
    // if limited by start time, early out as soon as we hit it
    long startRunId = FlowKey.encodeRunId(startTime);
    // zero byte at the end makes the startRunId inclusive
    byte[] scanEndRow = Bytes.add(rowPrefix, Bytes.toBytes(startRunId),
        Constants.ZERO_SINGLE_BYTE);
    scan.setStopRow(scanEndRow);
  } else {
    // require that all rows match the app prefix we're looking for
    filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
  }

  // if version is passed, restrict the rows returned to that version
  if (version != null && version.length() > 0) {
    filters.addFilter(new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
        Constants.VERSION_COLUMN_BYTES, CompareFilter.CompareOp.EQUAL,
        Bytes.toBytes(version)));
  }

  // filter out all config columns except the queue name
  filters.addFilter(new QualifierFilter(CompareFilter.CompareOp.NOT_EQUAL,
      new RegexStringComparator(
          "^c\\!((?!" + Constants.HRAVEN_QUEUE + ").)*$")));

  scan.setFilter(filters);

  LOG.info("scan : \n " + scan.toJSON() + " \n");
  return createFromResults(scan, false, limit);
}
 
源代码14 项目: hraven   文件: FlowQueueService.java
/**
 * Returns the flows currently listed in the given {@link Flow.Status}
 * @param cluster The cluster where flows have run
 * @param status The flows' status
 * @param limit Return up to this many Flow instances
 * @param user Filter flows returned to only this user (if present)
 * @param startRow Start results at this key. Use this in combination with
 *          {@code limit} to support pagination through the results.
 * @return a list of up to {@code limit} Flows
 * @throws IOException in the case of an error retrieving the data
 */
public List<Flow> getFlowsForStatus(String cluster, Flow.Status status,
    int limit, String user, byte[] startRow) throws IOException {
  byte[] rowPrefix = ByteUtil.join(Constants.SEP_BYTES,
      Bytes.toBytes(cluster), status.code(), Constants.EMPTY_BYTES);
  if (startRow == null) {
    startRow = rowPrefix;
  }
  Scan scan = new Scan(startRow);
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  // early out when prefix ends
  filters.addFilter(new WhileMatchFilter(new PrefixFilter(rowPrefix)));
  if (user != null) {
    SingleColumnValueFilter userFilter = new SingleColumnValueFilter(
        Constants.INFO_FAM_BYTES, USER_NAME_COL_BYTES,
        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(user));
    userFilter.setFilterIfMissing(true);
    filters.addFilter(userFilter);
  }
  scan.setFilter(filters);
  // TODO: need to constrain this by timerange as well to prevent unlimited
  // scans

  // get back the results in a single response
  scan.setCaching(limit);
  List<Flow> results = new ArrayList<Flow>(limit);
  ResultScanner scanner = null;
  Table flowQueueTable = null;
  try {
    flowQueueTable = hbaseConnection
        .getTable(TableName.valueOf(Constants.FLOW_QUEUE_TABLE));
    scanner = flowQueueTable.getScanner(scan);
    int cnt = 0;
    for (Result r : scanner) {
      Flow flow = createFlowFromResult(r);
      if (flow != null) {
        cnt++;
        results.add(flow);
      }
      if (cnt >= limit) {
        break;
      }
    }
  } finally {
    try {
    if (scanner != null) {
      scanner.close();
    }
    } finally {
      if (flowQueueTable != null) {
        flowQueueTable.close();
      }
    }
  }
  return results;
}
 
源代码15 项目: hraven   文件: AppSummaryService.java
/**
 * scans the app version table to look for jobs that showed up in the given
 * time range creates the flow key that maps to these apps
 * @param cluster
 * @param user
 * @param startTime
 * @param endTime
 * @param limit
 * @return list of flow keys
 * @throws IOException
 * @throws ProcessingException
 */
public List<AppSummary> getNewApps(JobHistoryService jhs, String cluster,
    String user, long startTime, long endTime, int limit) throws IOException {
  byte[] startRow = null;
  if (StringUtils.isNotBlank(user)) {
    startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster),
        Bytes.toBytes(user));
  } else {
    startRow = ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes(cluster));
  }
  LOG.info(
      "Reading app version rows start at " + Bytes.toStringBinary(startRow));
  Scan scan = new Scan();
  // start scanning app version table at cluster!user!
  scan.setStartRow(startRow);
  // require that all results match this flow prefix
  FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  filters.addFilter(new WhileMatchFilter(new PrefixFilter(startRow)));

  scan.setFilter(filters);

  List<AppKey> newAppsKeys = new ArrayList<AppKey>();
  try {
    newAppsKeys =
        createNewAppKeysFromResults(scan, startTime, endTime, limit);
  } catch (IOException e) {
    LOG.error(
        "Caught exception while trying to scan, returning empty list of flows: "
            + e.toString());
  }

  List<AppSummary> newApps = new ArrayList<AppSummary>();
  for (AppKey ak : newAppsKeys) {
    AppSummary anApp = new AppSummary(ak);
    List<Flow> flows =
        jhs.getFlowSeries(ak.getCluster(), ak.getUserName(), ak.getAppId(),
            null, Boolean.FALSE, startTime, endTime, Integer.MAX_VALUE);
    for (Flow f : flows) {
      anApp.addFlow(f);
    }
    newApps.add(anApp);
  }
  return newApps;

}
 
 类所在包
 类方法
 同包方法