org.apache.hadoop.hbase.client.Result#listCells ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.client.Result#listCells ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hbase   文件: TestFuzzyRowAndColumnRangeFilter.java
private void runScanner(Table hTable, int expectedSize, Filter... filters) throws IOException {
  String cf = "f";
  Scan scan = new Scan();
  scan.addFamily(Bytes.toBytes(cf));
  FilterList filterList = new FilterList(filters);
  scan.setFilter(filterList);

  ResultScanner scanner = hTable.getScanner(scan);
  List<Cell> results = new ArrayList<>();
  Result result;
  long timeBeforeScan = System.currentTimeMillis();
  while ((result = scanner.next()) != null) {
    for (Cell kv : result.listCells()) {
      LOG.info("Got rk: " + Bytes.toStringBinary(CellUtil.cloneRow(kv)) + " cq: "
              + Bytes.toStringBinary(CellUtil.cloneQualifier(kv)));
      results.add(kv);
    }
  }
  long scanTime = System.currentTimeMillis() - timeBeforeScan;
  scanner.close();

  LOG.info("scan time = " + scanTime + "ms");
  LOG.info("found " + results.size() + " results");

  assertEquals(expectedSize, results.size());
}
 
源代码2 项目: cantor   文件: HBaseStorage.java
@Override
public List<Long> timeMeta() {
    List<Long> times = new ArrayList<>();

    Get get = (new Get(HBASE_LATTICE_KEY)).addFamily(INST_FAMILY);
    Result result;
    try {
        result = metaTable.get(get);
        List<Cell> cells = result.listCells();
        if (log.isDebugEnabled())
            log.debug("Time lattice is {}", cells.stream()
                                                 .map(c -> Bytes.toLong(c.getValueArray(),
                                                         c.getValueOffset()))
                                                 .collect(Collectors.toList()));
        for (Cell cell : cells) {
            long current = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
            times.add(current);
        }
    } catch (Exception e) {
        if (log.isErrorEnabled())
            log.error("get time lattice from hbase failed", e);
    }

    return times;
}
 
源代码3 项目: hbase   文件: TestTimeRangeMapRed.java
private void verify(final Table table) throws IOException {
  Scan scan = new Scan();
  scan.addColumn(FAMILY_NAME, COLUMN_NAME);
  scan.readVersions(1);
  ResultScanner scanner = table.getScanner(scan);
  for (Result r: scanner) {
    for (Cell kv : r.listCells()) {
      log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv))
          + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv))
          + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv)));
      org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()),
        Bytes.toBoolean(CellUtil.cloneValue(kv)));
    }
  }
  scanner.close();
}
 
public List<SpanProtos.Span> getRootSpans() throws IOException {
  startClient();
  Scan scan = new Scan();
  scan.addColumn(this.icf, HBaseSpanReceiver.INDEX_SPAN_QUAL);
  List<SpanProtos.Span> spans = new ArrayList<SpanProtos.Span>();
  try {
    ResultScanner scanner = htable.getScanner(scan);
    Result result = null;
    while ((result = scanner.next()) != null) {
      for (Cell cell : result.listCells()) {
        InputStream in = new ByteArrayInputStream(cell.getValueArray(),
                                                  cell.getValueOffset(),
                                                  cell.getValueLength());
        spans.add(SpanProtos.Span.parseFrom(in));
      }
    }
  } catch (IOException e) {
    LOG.warn("Failed to get root spans from HBase. " + e.getMessage());
    stopClient();
  }
  return spans;
}
 
源代码5 项目: phoenix-tephra   文件: SecondaryIndexTable.java
public Result[] getByIndex(byte[] value) throws IOException {
  try {
    transactionContext.start();
    Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
    scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
    ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);

    ArrayList<Get> gets = new ArrayList<Get>();
    for (Result result : indexScanner) {
      for (Cell cell : result.listCells()) {
        gets.add(new Get(cell.getValue()));
      }
    }
    Result[] results = transactionAwareHTable.get(gets);
    transactionContext.finish();
    return results;
  } catch (Exception e) {
    try {
      transactionContext.abort();
    } catch (TransactionFailureException e1) {
      throw new IOException("Could not rollback transaction", e1);
    }
  }
  return null;
}
 
源代码6 项目: examples   文件: EnrichedCDRHbaseInputOperator.java
@Override
public void emitTuples()
{
  try {
    ResultScanner scanner = getResultScanner();

    for (int i = 0; i < batchSize; ++i) {
      Result result = scanner.next();
      if (result == null) {
        break;
      }

      nameValueMap.clear();

      //row is imsi
      nameValueMap.put("imsi", result.getRow());

      List<Cell> cells = result.listCells();
      for (Cell cell : cells) {
        String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
        byte[] value = CellUtil.cloneValue(cell);
        nameValueMap.put(columnName, value);
      }
      EnrichedCDR cdr = new EnrichedCDR(nameValueMap);

      outputPort.emit(cdr);
    }

  } catch (Exception e) {
    logger.error("emitTuples() exception", e);
  }
}
 
@Override
public List<String> getUserAuths(byte[] user, boolean systemCall) throws IOException {
  assert (labelsRegion != null || systemCall);
  List<String> auths = new ArrayList<>();
  Get get = new Get(user);
  List<Cell> cells = null;
  if (labelsRegion == null) {
    Table table = null;
    Connection connection = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
      Result result = table.get(get);
      cells = result.listCells();
    } finally {
      if (table != null) {
        table.close();
      }
      if (connection != null){
        connection.close();
      }
    }
  } else {
    cells = this.labelsRegion.get(get, false);
  }
  if (cells != null) {
    for (Cell cell : cells) {
      String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
        cell.getQualifierLength());
      auths.add(auth);
    }
  }
  return auths;
}
 
源代码8 项目: kylin   文件: Results.java
public static ByteBuffer getValueAsByteBuffer(Result hbaseRow, byte[] cf, byte[] cq) {
    List<Cell> cells = hbaseRow.listCells();
    if (cells == null || cells.size() == 0) {
        return null;
    } else {
        for (Cell c : cells) {
            if (Bytes.compareTo(cf, 0, cf.length, c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength()) == 0 && //
                    Bytes.compareTo(cq, 0, cq.length, c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()) == 0) {
                return ByteBuffer.wrap(c.getValueArray(), c.getValueOffset(), c.getValueLength());
            }
        }
    }
    return null;
}
 
源代码9 项目: kylin   文件: ExtendCubeToHybridCLI.java
private void copyAcl(String origCubeId, String newCubeId, String projectName) throws Exception {
    String projectResPath = ProjectInstance.concatResourcePath(projectName);
    Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
    ProjectInstance project = store.getResource(projectResPath, projectSerializer);
    String projUUID = project.getUuid();
    Table aclHtable = null;
    try {
        aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl"));

        // cube acl
        Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId)));
        if (result.listCells() != null) {
            for (Cell cell : result.listCells()) {
                byte[] family = CellUtil.cloneFamily(cell);
                byte[] column = CellUtil.cloneQualifier(cell);
                byte[] value = CellUtil.cloneValue(cell);

                // use the target project uuid as the parent
                if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
                    String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
                    value = Bytes.toBytes(valueString);
                }
                Put put = new Put(Bytes.toBytes(newCubeId));
                put.add(family, column, value);
                aclHtable.put(put);
            }
        }
    } finally {
        IOUtils.closeQuietly(aclHtable);
    }
}
 
源代码10 项目: hbase   文件: ProtobufStreamingOutput.java
private CellSetModel createModelFromResults(Result[] results) {
  CellSetModel cellSetModel = new CellSetModel();
  for (Result rs : results) {
    byte[] rowKey = rs.getRow();
    RowModel rModel = new RowModel(rowKey);
    List<Cell> kvs = rs.listCells();
    for (Cell kv : kvs) {
      rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv
          .getTimestamp(), CellUtil.cloneValue(kv)));
    }
    cellSetModel.addRow(rModel);
  }
  return cellSetModel;
}
 
源代码11 项目: hbase   文件: AcidGuaranteesTestTool.java
private void gotFailure(byte[] expected, Result res) {
  StringBuilder msg = new StringBuilder();
  msg.append("Failed after ").append(numRowsScanned).append("!");
  msg.append("Expected=").append(Bytes.toStringBinary(expected));
  msg.append("Got:\n");
  for (Cell kv : res.listCells()) {
    msg.append(kv.toString());
    msg.append(" val= ");
    msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
    msg.append("\n");
  }
  throw new RuntimeException(msg.toString());
}
 
源代码12 项目: hbase   文件: TestTableInputFormat.java
@Override
public void map(ImmutableBytesWritable key, Result value, Context context)
    throws IOException {
  for (Cell cell : value.listCells()) {
    context.getCounter(TestTableInputFormat.class.getName() + ":row",
        Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
        .increment(1l);
    context.getCounter(TestTableInputFormat.class.getName() + ":family",
        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
        .increment(1l);
    context.getCounter(TestTableInputFormat.class.getName() + ":value",
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
        .increment(1l);
  }
}
 
源代码13 项目: hbase   文件: MobTestUtil.java
public static void assertCellsValue(Table table, Scan scan,
    byte[] expectedValue, int expectedCount) throws IOException {
  ResultScanner results = table.getScanner(scan);
  int count = 0;
  for (Result res : results) {
    List<Cell> cells = res.listCells();
    for(Cell cell : cells) {
      // Verify the value
      Assert.assertArrayEquals(expectedValue, CellUtil.cloneValue(cell));
      count++;
    }
  }
  results.close();
  Assert.assertEquals(expectedCount, count);
}
 
源代码14 项目: examples   文件: HBaseAvroToSOLRMapper.java
@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
    throws InterruptedException, IOException {

  context.getCounter("HBaseAvro", "Total size").increment(values.size());
  context.getCounter("HBaseAvro", "Uniq size").increment(1);
  for (Cell cell: values.listCells()) {
    try {
      // tag::SETUP[]
      event = util.cellToEvent(cell, event); // <1>

      inputDocument.clear(); // <2>
      inputDocument.addField("id", UUID.randomUUID().toString()); // <3> 
      inputDocument.addField("rowkey", row.get());
      inputDocument.addField("eventId", event.getEventId().toString());
      inputDocument.addField("docType", event.getDocType().toString());
      inputDocument.addField("partName", event.getPartName().toString());
      inputDocument.addField("partNumber", event.getPartNumber().toString());
      inputDocument.addField("version", event.getVersion());
      inputDocument.addField("payload", event.getPayload().toString());

      context.write(new Text(cell.getRowArray()),
                        new SolrInputDocumentWritable(inputDocument)); // <4>
      // end::SETUP[]
      context.getCounter("HBaseAvro", "passed").increment(1);
    } catch (Exception e) {
      context.getCounter("HBaseAvro", "failed").increment(1);
      LOG.error("Issue for "  + Bytes.toStringBinary(cell.getValueArray()), e);
    }
  }
}
 
源代码15 项目: hgraphdb   文件: VertexReader.java
@Override
public void load(Vertex vertex, Result result) {
    if (result.isEmpty()) {
        throw new HBaseGraphNotFoundException(vertex, "Vertex does not exist: " + vertex.id());
    }
    String label = null;
    Long createdAt = null;
    Long updatedAt = null;
    Map<String, byte[]> rawProps = new HashMap<>();
    for (Cell cell : result.listCells()) {
        String key = Bytes.toString(CellUtil.cloneQualifier(cell));
        if (!Graph.Hidden.isHidden(key)) {
            rawProps.put(key, CellUtil.cloneValue(cell));
        } else if (key.equals(Constants.LABEL)) {
            label = ValueUtils.deserialize(CellUtil.cloneValue(cell));
        } else if (key.equals(Constants.CREATED_AT)) {
            createdAt = ValueUtils.deserialize(CellUtil.cloneValue(cell));
        } else if (key.equals(Constants.UPDATED_AT)) {
            updatedAt = ValueUtils.deserialize(CellUtil.cloneValue(cell));
        }
    }
    final String labelStr = label;
    Map<String, Object> props = rawProps.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
            e -> ValueUtils.deserializePropertyValue(graph, ElementType.VERTEX, labelStr, e.getKey(), e.getValue())));
    HBaseVertex newVertex = new HBaseVertex(graph, vertex.id(), label, createdAt, updatedAt, props);
    ((HBaseVertex) vertex).copyFrom(newVertex);
}
 
源代码16 项目: hbase   文件: TestMobStoreScanner.java
private void testMobThreshold(boolean reversed) throws Exception {
  TableName tn = TableName.valueOf("testMobThreshold" + reversed);
  setUp(defaultThreshold, tn);
  byte [] valueLess = generateMobValue((int)defaultThreshold-1);
  byte [] valueEqual = generateMobValue((int)defaultThreshold);
  byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
  long ts1 = System.currentTimeMillis();
  long ts2 = ts1 + 1;
  long ts3 = ts1 + 2;

  Put put1 = new Put(row1);
  put1.addColumn(family, qf1, ts3, valueLess);
  put1.addColumn(family, qf2, ts2, valueEqual);
  put1.addColumn(family, qf3, ts1, valueGreater);
  table.put(put1);

  admin.flush(tn);

  Scan scan = new Scan();
  setScan(scan, reversed, true);

  Cell cellLess= null;
  Cell cellEqual = null;
  Cell cellGreater = null;
  ResultScanner results = table.getScanner(scan);
  int count = 0;
  for (Result res : results) {
    List<Cell> cells = res.listCells();
    for(Cell cell : cells) {
      // Verify the value
      String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
      if(qf.equals(Bytes.toString(qf1))) {
        cellLess = cell;
      }
      if(qf.equals(Bytes.toString(qf2))) {
        cellEqual = cell;
      }
      if(qf.equals(Bytes.toString(qf3))) {
        cellGreater = cell;
      }
      count++;
    }
  }
  Assert.assertEquals(3, count);
  assertNotMobReference(cellLess, row1, family, valueLess);
  assertNotMobReference(cellEqual, row1, family, valueEqual);
  assertIsMobReference(cellGreater, row1, family, valueGreater, tn);
  results.close();
}
 
源代码17 项目: phoenix-omid   文件: TestCompaction.java
@Test(timeOut = 60_000)
public void testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs() throws Throwable {
    String TEST_TABLE = "testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs";
    createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
    TTable txTable = new TTable(connection, TEST_TABLE);

    Table table = txTable.getHTable();

    // Configure the environment to create a minor compaction

    // Write first a value transactionally
    HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
    byte[] rowId = Bytes.toBytes("row1");
    Put p0 = new Put(rowId);
    p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
    txTable.put(tx0, p0);
    tm.commit(tx0);

    // create the first hfile
    manualFlush(TEST_TABLE);

    // Write another value transactionally
    HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
    Put p1 = new Put(rowId);
    p1.addColumn(fam, qual, Bytes.toBytes("testValue-1"));
    txTable.put(tx1, p1);
    tm.commit(tx1);

    // create the second hfile
    manualFlush(TEST_TABLE);

    // Write yet another value transactionally
    HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
    Put p2 = new Put(rowId);
    p2.addColumn(fam, qual, Bytes.toBytes("testValue-2"));
    txTable.put(tx2, p2);
    tm.commit(tx2);

    // create a third hfile
    manualFlush(TEST_TABLE);

    // Then perform a non-transactional Delete
    Delete d = new Delete(rowId);
    d.addColumn(fam, qual);
    table.delete(d);

    // create the fourth hfile
    manualFlush(TEST_TABLE);

    // Trigger the minor compaction
    HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
    setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
    admin.compact(TableName.valueOf(TEST_TABLE));
    Thread.sleep(5000);

    // Then perform a non-tx (raw) scan...
    Scan scan = new Scan();
    scan.setRaw(true);
    ResultScanner scannerResults = table.getScanner(scan);

    // ...and test the deleted cell is still there
    int count = 0;
    Result scanResult;
    List<Cell> listOfCellsScanned = new ArrayList<>();
    while ((scanResult = scannerResults.next()) != null) {
        listOfCellsScanned = scanResult.listCells(); // equivalent to rawCells()
        count++;
    }
    assertEquals(count, 1, "There should be only one result in scan results");
    assertEquals(listOfCellsScanned.size(), 3, "There should be 3 cell entries in scan results (2 puts, 1 del)");
    boolean wasDeletedCellFound = false;
    int numberOfDeletedCellsFound = 0;
    for (Cell cell : listOfCellsScanned) {
        if (CellUtil.isDelete(cell)) {
            wasDeletedCellFound = true;
            numberOfDeletedCellsFound++;
        }
    }
    assertTrue(wasDeletedCellFound, "We should have found a non-transactionally deleted cell");
    assertEquals(numberOfDeletedCellsFound, 1, "There should be only only one deleted cell");

    table.close();
}
 
protected void readRecordsAndVerify()
{
  int[] rowIds = new int[ TEST_SIZE ];
  for ( int i = 1; i <= TEST_SIZE; ++i ) {
    rowIds[i - 1] = 1;
  }
  try {
    HTable table = operator.getStore().getTable();
    Scan scan = new Scan();
    ResultScanner resultScanner = table.getScanner(scan);

    int recordCount = 0;
    while ( true ) {
      Result result = resultScanner.next();
      if ( result == null ) {
        break;
      }
      int rowId = Integer.valueOf( Bytes.toString( result.getRow() ) );
      Assert.assertTrue( "rowId=" + rowId + " aut of range", ( rowId > 0 && rowId <= TEST_SIZE ) );
      Assert.assertTrue( "the rowId=" + rowId + " already processed.", rowIds[rowId - 1] == 1 );
      rowIds[rowId - 1] = 0;

      List<Cell> cells = result.listCells();

      Map<String, byte[]> map = new HashMap<>();
      for ( Cell cell : cells ) {
        String columnName = Bytes.toString( CellUtil.cloneQualifier(cell) );
        byte[] value = CellUtil.cloneValue(cell);
        map.put(columnName, value);
      }
      TestPOJO read = TestPOJO.from(map);
      read.setRowId((long)rowId);
      TestPOJO expected = new TestPOJO( rowId );

      Assert.assertTrue( String.format( "expected %s, get %s ", expected.toString(), read.toString() ), expected.completeEquals(read) );
      recordCount++;
    }

    int missedCount = 0;
    if ( recordCount != TEST_SIZE ) {
      logger.error( "unsaved records: " );
      StringBuilder sb = new StringBuilder();
      for ( int i = 0; i < TEST_SIZE; ++i ) {
        if ( rowIds[i] != 0 ) {
          sb.append(i + 1).append(", ");
          missedCount++;
        }
        if ( missedCount > 0 && ( missedCount % 20 == 0 ) ) {
          logger.error( sb.toString() );
          sb.delete( 0, sb.length() );
        }
      }
      logger.error( sb.toString() );
      logger.error( "End of unsaved records" );
    }
    Assert.assertTrue( "expected total records = " + TEST_SIZE + ", got " + recordCount + ", missed " + missedCount, recordCount == TEST_SIZE );
  } catch ( Exception e ) {
    throw new RuntimeException( "exception", e );
  }
}
 
源代码19 项目: hbase   文件: TestMultithreadedTableMapper.java
/**
 * Looks at every value of the mapreduce output and verifies that indeed
 * the values have been reversed.
 *
 * @param table Table to scan.
 * @throws IOException
 * @throws NullPointerException if we failed to find a cell value
 */
private void verifyAttempt(final Table table)
    throws IOException, NullPointerException {
  Scan scan = new Scan();
  scan.addFamily(INPUT_FAMILY);
  scan.addFamily(OUTPUT_FAMILY);
  ResultScanner scanner = table.getScanner(scan);
  try {
    Iterator<Result> itr = scanner.iterator();
    assertTrue(itr.hasNext());
    while(itr.hasNext()) {
      Result r = itr.next();
      if (LOG.isDebugEnabled()) {
        if (r.size() > 2 ) {
          throw new IOException("Too many results, expected 2 got " +
              r.size());
        }
      }
      byte[] firstValue = null;
      byte[] secondValue = null;
      int count = 0;
      for(Cell kv : r.listCells()) {
        if (count == 0) {
          firstValue = CellUtil.cloneValue(kv);
        }else if (count == 1) {
          secondValue = CellUtil.cloneValue(kv);
        }else if (count == 2) {
          break;
        }
        count++;
      }
      String first = "";
      if (firstValue == null) {
        throw new NullPointerException(Bytes.toString(r.getRow()) +
            ": first value is null");
      }
      first = Bytes.toString(firstValue);
      String second = "";
      if (secondValue == null) {
        throw new NullPointerException(Bytes.toString(r.getRow()) +
            ": second value is null");
      }
      byte[] secondReversed = new byte[secondValue.length];
      for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
        secondReversed[i] = secondValue[j];
      }
      second = Bytes.toString(secondReversed);
      if (first.compareTo(second) != 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("second key is not the reverse of first. row=" +
              Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
              ", second value=" + second);
        }
        fail();
      }
    }
  } finally {
    scanner.close();
  }
}
 
源代码20 项目: hbase   文件: TableScanResource.java
@GET
@Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
public CellSetModelStream get(final @Context UriInfo uriInfo) {
  if (LOG.isTraceEnabled()) {
    LOG.trace("GET " + uriInfo.getAbsolutePath());
  }
  servlet.getMetrics().incrementRequests(1);
  final int rowsToSend = userRequestedLimit;
  servlet.getMetrics().incrementSucessfulScanRequests(1);
  final Iterator<Result> itr = results.iterator();
  return new CellSetModelStream(new ArrayList<RowModel>() {
    @Override
    public Iterator<RowModel> iterator() {
      return new Iterator<RowModel>() {
        int count = rowsToSend;

        @Override
        public boolean hasNext() {
          return count > 0 && itr.hasNext();
        }

        @Override
        public RowModel next() {
          Result rs = itr.next();
          if ((rs == null) || (count <= 0)) {
            return null;
          }
          byte[] rowKey = rs.getRow();
          RowModel rModel = new RowModel(rowKey);
          List<Cell> kvs = rs.listCells();
          for (Cell kv : kvs) {
            rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
                kv.getTimestamp(), CellUtil.cloneValue(kv)));
          }
          count--;
          if (count == 0) {
            results.close();
          }
          return rModel;
        }
      };
    }
  });
}