org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: kylin-on-parquet-v2   文件: CubeVisitService.java
public InnerScannerAsIterator(RegionScanner regionScanner) {
    this.regionScanner = regionScanner;

    try {
        hasMore = regionScanner.nextRaw(nextOne);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
源代码2 项目: kylin   文件: CubeVisitService.java
public InnerScannerAsIterator(RegionScanner regionScanner) {
    this.regionScanner = regionScanner;

    try {
        hasMore = regionScanner.nextRaw(nextOne);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 
源代码3 项目: Kylin   文件: AggregationScanner.java
@SuppressWarnings("rawtypes")
ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {

    ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);

    ObserverTuple tuple = new ObserverTuple(type);
    boolean hasMore = true;
    List<Cell> results = new ArrayList<Cell>();
    while (hasMore) {
        results.clear();
        hasMore = innerScanner.nextRaw(results);
        if (results.isEmpty())
            continue;

        if (stats != null)
            stats.countInputRow(results);

        Cell cell = results.get(0);
        tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
        if (filter != null && filter.evaluate(tuple) == false)
            continue;

        CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(results);
        MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
        aggregators.aggregate(bufs, results);

        aggCache.checkMemoryUsage();
    }
    return aggCache;
}
 
源代码4 项目: phoenix   文件: PhoenixIndexBuilder.java
@Override
public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
    // The entire purpose of this method impl is to get the existing rows for the
    // table rows being indexed into the block cache, as the index maintenance code
    // does a point scan per row
    List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
    List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>();
    for (int i = 0; i < miniBatchOp.size(); i++) {
        Mutation m = miniBatchOp.getOperation(i).getFirst();
        keys.add(PDataType.VARBINARY.getKeyRange(m.getRow()));
        maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap()));
    }
    Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
    ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);
    scanRanges.setScanStartStopRow(scan);
    scan.setFilter(scanRanges.getSkipScanFilter());
    HRegion region = this.env.getRegion();
    RegionScanner scanner = region.getScanner(scan);
    // Run through the scanner using internal nextRaw method
    MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
    region.startRegionOperation();
    try {
        boolean hasMore;
        do {
            List<KeyValue> results = Lists.newArrayList();
            // Results are potentially returned even when the return value of s.next is false
            // since this is an indication of whether or not there are more values after the
            // ones returned
            hasMore = scanner.nextRaw(results, null);
        } while (hasMore);
    } finally {
        try {
            scanner.close();
        } finally {
            region.closeRegionOperation();
        }
    }
}
 
源代码5 项目: phoenix   文件: PhoenixIndexBuilder.java
@Override
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
    // The entire purpose of this method impl is to get the existing rows for the
    // table rows being indexed into the block cache, as the index maintenance code
    // does a point scan per row
    List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
    Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
            new HashMap<ImmutableBytesWritable, IndexMaintainer>();
    ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
    for (int i = 0; i < miniBatchOp.size(); i++) {
        Mutation m = miniBatchOp.getOperation(i);
        keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
        List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
        
        for(IndexMaintainer indexMaintainer: indexMaintainers) {
            if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
            indexTableName.set(indexMaintainer.getIndexTableName());
            if (maintainers.get(indexTableName) != null) continue;
            maintainers.put(indexTableName, indexMaintainer);
        }
        
    }
    if (maintainers.isEmpty()) return;
    Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
    ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
    scanRanges.initializeScan(scan);
    scan.setFilter(scanRanges.getSkipScanFilter());
    HRegion region = this.env.getRegion();
    RegionScanner scanner = region.getScanner(scan);
    // Run through the scanner using internal nextRaw method
    region.startRegionOperation();
    try {
        boolean hasMore;
        do {
            List<Cell> results = Lists.newArrayList();
            // Results are potentially returned even when the return value of s.next is false
            // since this is an indication of whether or not there are more values after the
            // ones returned
            hasMore = scanner.nextRaw(results);
        } while (hasMore);
    } finally {
        try {
            scanner.close();
        } finally {
            region.closeRegionOperation();
        }
    }
}
 
源代码6 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Used for an aggregate query in which the key order does not necessarily match the group by
 * key order. In this case, we must collect all distinct groups within a region into a map,
 * aggregating as we go.
 * @param limit TODO
 */
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
        final RegionScanner s, final List<Expression> expressions,
        final ServerAggregators aggregators, long limit) throws IOException {
    if (logger.isDebugEnabled()) {
        logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
                + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
    }
    RegionCoprocessorEnvironment env = c.getEnvironment();
    Configuration conf = env.getConfiguration();
    int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
    byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
    if (estDistValsBytes != null) {
        // Allocate 1.5x estimation
        estDistVals = Math.max(MIN_DISTINCT_VALUES, 
                        (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
    }

    final boolean spillableEnabled =
            conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);

    GroupByCache groupByCache = 
            GroupByCacheFactory.INSTANCE.newCache(
                    env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
                    aggregators, estDistVals);
    boolean success = false;
    try {
        boolean hasMore;

        MultiKeyValueTuple result = new MultiKeyValueTuple();
        if (logger.isDebugEnabled()) {
            logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
        }

        HRegion region = c.getEnvironment().getRegion();
        region.startRegionOperation();
        try {
            do {
                List<Cell> results = new ArrayList<Cell>();
                // Results are potentially returned even when the return
                // value of s.next is false
                // since this is an indication of whether or not there are
                // more values after the
                // ones returned
                hasMore = s.nextRaw(results);
                if (!results.isEmpty()) {
                    result.setKeyValues(results);
                    ImmutableBytesWritable key =
                            TupleUtil.getConcatenatedValue(result, expressions);
                    Aggregator[] rowAggregators = groupByCache.cache(key);
                    // Aggregate values here
                    aggregators.aggregate(rowAggregators, result);
                }
            } while (hasMore && groupByCache.size() < limit);
        } finally {
            region.closeRegionOperation();
        }

        RegionScanner regionScanner = groupByCache.getScanner(s);

        // Do not sort here, but sort back on the client instead
        // The reason is that if the scan ever extends beyond a region
        // (which can happen if we're basing our parallelization split
        // points on old metadata), we'll get incorrect query results.
        success = true;
        return regionScanner;
    } finally {
        if (!success) {
            Closeables.closeQuietly(groupByCache);
        }
    }
}
 
源代码7 项目: hbase   文件: Export.java
private static ExportProtos.ExportResponse processData(final Region region,
    final Configuration conf, final UserProvider userProvider, final Scan scan,
    final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
  ScanCoprocessor cp = new ScanCoprocessor(region);
  RegionScanner scanner = null;
  try (RegionOp regionOp = new RegionOp(region);
          SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
    scanner = cp.checkScannerOpen(scan);
    ImmutableBytesWritable key = new ImmutableBytesWritable();
    long rowCount = 0;
    long cellCount = 0;
    List<Result> results = new ArrayList<>();
    List<Cell> cells = new ArrayList<>();
    boolean hasMore;
    do {
      boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
      if (bypass) {
        hasMore = false;
      } else {
        hasMore = scanner.nextRaw(cells);
        if (cells.isEmpty()) {
          continue;
        }
        Cell firstCell = cells.get(0);
        for (Cell cell : cells) {
          if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
              firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
              cell.getRowLength()) != 0) {
            throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
                + " rows?? first row="
                + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
                  firstCell.getRowLength())
                + ", current row="
                + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
          }
        }
        results.add(Result.create(cells));
        cells.clear();
        cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
      }
      for (Result r : results) {
        key.set(r.getRow());
        out.append(key, r);
        ++rowCount;
        cellCount += r.size();
      }
      results.clear();
    } while (hasMore);
    return ExportProtos.ExportResponse.newBuilder()
            .setRowCount(rowCount)
            .setCellCount(cellCount)
            .build();
  } finally {
    cp.checkScannerClose(scanner);
  }
}
 
源代码8 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Used for an aggregate query in which the key order does not necessarily match the group by
 * key order. In this case, we must collect all distinct groups within a region into a map,
 * aggregating as we go.
 * @param limit TODO
 */
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
        final RegionScanner scanner, final List<Expression> expressions,
        final ServerAggregators aggregators, long limit) throws IOException {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(LogUtil.addCustomAnnotations(
                "Grouped aggregation over unordered rows with scan " + scan
                + ", group by " + expressions + ", aggregators " + aggregators,
                ScanUtil.getCustomAnnotations(scan)));
    }
    RegionCoprocessorEnvironment env = c.getEnvironment();
    Configuration conf = env.getConfiguration();
    int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
    byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
    if (estDistValsBytes != null) {
        // Allocate 1.5x estimation
        estDistVals = Math.max(MIN_DISTINCT_VALUES,
                        (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
    }
    
    Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
    boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
    final boolean spillableEnabled =
            conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
    final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);

    GroupByCache groupByCache =
            GroupByCacheFactory.INSTANCE.newCache(
                    env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
                    aggregators, estDistVals);
    boolean success = false;
    try {
        boolean hasMore;
        Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(LogUtil.addCustomAnnotations(
                    "Spillable groupby enabled: " + spillableEnabled,
                    ScanUtil.getCustomAnnotations(scan)));
        }
        Region region = c.getEnvironment().getRegion();
        boolean acquiredLock = false;
        try {
            region.startRegionOperation();
            acquiredLock = true;
            synchronized (scanner) {
                do {
                    List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                    // Results are potentially returned even when the return
                    // value of s.next is false
                    // since this is an indication of whether or not there are
                    // more values after the
                    // ones returned
                    hasMore = scanner.nextRaw(results);
                    if (!results.isEmpty()) {
                        result.setKeyValues(results);
                        ImmutableBytesPtr key =
                            TupleUtil.getConcatenatedValue(result, expressions);
                        Aggregator[] rowAggregators = groupByCache.cache(key);
                        // Aggregate values here
                        aggregators.aggregate(rowAggregators, result);
                    }
                } while (hasMore && groupByCache.size() < limit);
            }
        }  finally {
            if (acquiredLock) region.closeRegionOperation();
        }

        RegionScanner regionScanner = groupByCache.getScanner(scanner);

        // Do not sort here, but sort back on the client instead
        // The reason is that if the scan ever extends beyond a region
        // (which can happen if we're basing our parallelization split
        // points on old metadata), we'll get incorrect query results.
        success = true;
        return regionScanner;
    } finally {
        if (!success) {
            Closeables.closeQuietly(groupByCache);
        }
    }
}
 
源代码9 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Used for an aggregate query in which the key order match the group by key order. In this
 * case, we can do the aggregation as we scan, by detecting when the group by key changes.
 * @param limit TODO
 * @throws IOException
 */
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
        final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
        final ServerAggregators aggregators, final long limit) throws IOException {

    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(LogUtil.addCustomAnnotations(
                "Grouped aggregation over ordered rows with scan " + scan + ", group by "
                + expressions + ", aggregators " + aggregators,
                ScanUtil.getCustomAnnotations(scan)));
    }
    final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
    final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
    final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
    return new BaseRegionScanner(scanner) {
        private long rowCount = 0;
        private ImmutableBytesPtr currentKey = null;

        @Override
        public boolean next(List<Cell> results) throws IOException {
            boolean hasMore;
            boolean atLimit;
            boolean aggBoundary = false;
            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
            ImmutableBytesPtr key = null;
            Aggregator[] rowAggregators = aggregators.getAggregators();
            // If we're calculating no aggregate functions, we can exit at the
            // start of a new row. Otherwise, we have to wait until an agg
            int countOffset = rowAggregators.length == 0 ? 1 : 0;
            Region region = c.getEnvironment().getRegion();
            boolean acquiredLock = false;
            try {
                region.startRegionOperation();
                acquiredLock = true;
                synchronized (scanner) {
                    do {
                        List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                        // Results are potentially returned even when the return
                        // value of s.next is false
                        // since this is an indication of whether or not there
                        // are more values after the
                        // ones returned
                        hasMore = scanner.nextRaw(kvs);
                        if (!kvs.isEmpty()) {
                            result.setKeyValues(kvs);
                            key = TupleUtil.getConcatenatedValue(result, expressions);
                            aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
                            if (!aggBoundary) {
                                aggregators.aggregate(rowAggregators, result);
                                if (LOGGER.isDebugEnabled()) {
                                    LOGGER.debug(LogUtil.addCustomAnnotations(
                                        "Row passed filters: " + kvs
                                        + ", aggregated values: "
                                        + Arrays.asList(rowAggregators),
                                        ScanUtil.getCustomAnnotations(scan)));
                                }
                                currentKey = key;
                            }
                        }
                        atLimit = rowCount + countOffset >= limit;
                        // Do rowCount + 1 b/c we don't have to wait for a complete
                        // row in the case of a DISTINCT with a LIMIT
                    } while (hasMore && !aggBoundary && !atLimit);
                }
            } finally {
                if (acquiredLock) region.closeRegionOperation();
            }

            if (currentKey != null) {
                byte[] value = aggregators.toBytes(rowAggregators);
                Cell keyValue =
                        PhoenixKeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
                            currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
                            AGG_TIMESTAMP, value, 0, value.length);
                results.add(keyValue);
                // If we're at an aggregation boundary, reset the
                // aggregators and
                // aggregate with the current result (which is not a part of
                // the returned result).
                if (aggBoundary) {
                    aggregators.reset(rowAggregators);
                    aggregators.aggregate(rowAggregators, result);
                    currentKey = key;
                    rowCount++;
                    atLimit |= rowCount >= limit;
                }
            }
            // Continue if there are more
            if (!atLimit && (hasMore || aggBoundary)) {
                return true;
            }
            currentKey = null;
            return false;
        }
    };
}
 
源代码10 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Used for an aggregate query in which the key order does not necessarily match the group by
 * key order. In this case, we must collect all distinct groups within a region into a map,
 * aggregating as we go.
 */
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
        final RegionScanner s, final List<Expression> expressions,
        final ServerAggregators aggregators) throws IOException {
    if (logger.isDebugEnabled()) {
        logger.debug("Grouped aggregation over unordered rows with scan " + scan
                + ", group by " + expressions + ", aggregators " + aggregators);
    }
    int estDistVals = DEFAULT_ESTIMATED_DISTINCT_VALUES;
    byte[] estDistValsBytes = scan.getAttribute(ESTIMATED_DISTINCT_VALUES);
    if (estDistValsBytes != null) {
        // Allocate 1.5x estimation
        estDistVals = Math.min(MIN_DISTINCT_VALUES, 
                        (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
    }

    RegionCoprocessorEnvironment env = c.getEnvironment();
    Configuration conf = env.getConfiguration();
    final boolean spillableEnabled =
            conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);

    GroupByCache groupByCache = 
            GroupByCacheFactory.INSTANCE.newCache(
                    env, ScanUtil.getTenantId(scan), 
                    aggregators, estDistVals);

    boolean success = false;
    try {
        boolean hasMore;

        MultiKeyValueTuple result = new MultiKeyValueTuple();
        if (logger.isDebugEnabled()) {
            logger.debug("Spillable groupby enabled: " + spillableEnabled);
        }

        HRegion region = c.getEnvironment().getRegion();
        MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
        region.startRegionOperation();
        try {
            do {
                List<KeyValue> results = new ArrayList<KeyValue>();
                // Results are potentially returned even when the return
                // value of s.next is false
                // since this is an indication of whether or not there are
                // more values after the
                // ones returned
                hasMore = s.nextRaw(results, null);
                if (!results.isEmpty()) {
                    result.setKeyValues(results);
                    ImmutableBytesWritable key =
                            TupleUtil.getConcatenatedValue(result, expressions);
                    Aggregator[] rowAggregators = groupByCache.cache(key);
                    // Aggregate values here
                    aggregators.aggregate(rowAggregators, result);
                }
            } while (hasMore);
        } finally {
            region.closeRegionOperation();
        }

        RegionScanner regionScanner = groupByCache.getScanner(s);

        // Do not sort here, but sort back on the client instead
        // The reason is that if the scan ever extends beyond a region
        // (which can happen if we're basing our parallelization split
        // points on old metadata), we'll get incorrect query results.
        success = true;
        return regionScanner;
    } finally {
        if (!success) {
            Closeables.closeQuietly(groupByCache);
        }
    }
}
 
源代码11 项目: phoenix   文件: GroupedAggregateRegionObserver.java
/**
 * Used for an aggregate query in which the key order match the group by key order. In this
 * case, we can do the aggregation as we scan, by detecting when the group by key changes.
 */
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
        Scan scan, final RegionScanner s, final List<Expression> expressions,
        final ServerAggregators aggregators) {

    if (logger.isDebugEnabled()) {
        logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by "
                + expressions + ", aggregators " + aggregators);
    }
    return new BaseRegionScanner() {
        private ImmutableBytesWritable currentKey = null;

        @Override
        public HRegionInfo getRegionInfo() {
            return s.getRegionInfo();
        }

        @Override
        public void close() throws IOException {
            s.close();
        }

        @Override
        public boolean next(List<KeyValue> results) throws IOException {
            boolean hasMore;
            boolean aggBoundary = false;
            MultiKeyValueTuple result = new MultiKeyValueTuple();
            ImmutableBytesWritable key = null;
            Aggregator[] rowAggregators = aggregators.getAggregators();
            HRegion region = c.getEnvironment().getRegion();
            MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
            region.startRegionOperation();
            try {
                do {
                    List<KeyValue> kvs = new ArrayList<KeyValue>();
                    // Results are potentially returned even when the return
                    // value of s.next is false
                    // since this is an indication of whether or not there
                    // are more values after the
                    // ones returned
                    hasMore = s.nextRaw(kvs, null);
                    if (!kvs.isEmpty()) {
                        result.setKeyValues(kvs);
                        key = TupleUtil.getConcatenatedValue(result, expressions);
                        aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
                        if (!aggBoundary) {
                            aggregators.aggregate(rowAggregators, result);
                            if (logger.isDebugEnabled()) {
                                logger.debug("Row passed filters: " + kvs
                                        + ", aggregated values: "
                                        + Arrays.asList(rowAggregators));
                            }
                            currentKey = key;
                        }
                    }
                } while (hasMore && !aggBoundary);
            } finally {
                region.closeRegionOperation();
            }

            if (currentKey != null) {
                byte[] value = aggregators.toBytes(rowAggregators);
                KeyValue keyValue =
                        KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
                            currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
                            AGG_TIMESTAMP, value, 0, value.length);
                results.add(keyValue);
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding new aggregate row: "
                            + keyValue
                            + ",for current key "
                            + Bytes.toStringBinary(currentKey.get(), currentKey.getOffset(),
                                currentKey.getLength()) + ", aggregated values: "
                            + Arrays.asList(rowAggregators));
                }
                // If we're at an aggregation boundary, reset the
                // aggregators and
                // aggregate with the current result (which is not a part of
                // the returned result).
                if (aggBoundary) {
                    aggregators.reset(rowAggregators);
                    aggregators.aggregate(rowAggregators, result);
                    currentKey = key;
                }
            }
            // Continue if there are more
            if (hasMore || aggBoundary) {
                return true;
            }
            currentKey = null;
            return false;
        }
    };
}