下面列出了org.apache.hadoop.hbase.client.ResultScanner#iterator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
createTable(thriftAdmin, tableName);
try (Table table = connection.getTable(TableName.valueOf(tableName))){
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
CompareOperator.EQUAL, VALUE_1);
filterList.addFilter(prefixFilter);
filterList.addFilter(columnValueFilter);
Scan scan = new Scan();
scan.readVersions(2);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
assertTrue(iterator.hasNext());
int counter = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
counter += result.size();
}
assertEquals(2, counter);
}
}
private void verify(Scan scan) throws IOException {
ResultScanner scanner = htable.getScanner(scan);
Iterator<Result> it = scanner.iterator();
/* Then */
int count = 0;
try {
while (it.hasNext()) {
it.next();
count++;
}
} finally {
scanner.close();
}
assertEquals(expected, count);
}
public HBaseScanBasedIterator(Table table) {
try {
Scan scan = new Scan();
scan.setCaching(1000);
ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF);
scannerIterator = scanner.iterator();
} catch (IOException e) {
logger.error("error when scan HBase", e);
}
}
public HBaseScanBasedIterator(Table table) {
try {
Scan scan = new Scan();
scan.setCaching(1000);
ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF);
scannerIterator = scanner.iterator();
} catch (IOException e) {
logger.error("error when scan HBase", e);
}
}
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using HBaseBinaryFormat
*
* @throws IOException
*/
@Test
public void testStoreToHBase_1() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
pig.getPigContext().getProperties()
.setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "true");
scanTable1(pig, DataFormat.HBaseBinary);
pig.store("a", "hbase://" + TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "','-caster HBaseBinaryConverter')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = i + "";
String rowKey = Bytes.toString(result.getRow());
int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i, col_a);
Assert.assertEquals(i + 0.0, col_b, 1e-6);
Assert.assertEquals("Text_" + i, col_c);
}
Assert.assertEquals(100, i);
pig.getPigContext().getProperties()
.setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false");
table.close();
}
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using HBaseBinaryFormat projecting out column c
*
* @throws IOException
*/
@Test
public void testStoreToHBase_1_with_projection() throws IOException {
System.getProperties().setProperty("pig.usenewlogicalplan", "false");
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
scanTable1(pig, DataFormat.HBaseBinary);
pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
pig.store("b", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B +
"','-caster HBaseBinaryConverter')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = String.valueOf(i);
String rowKey = Bytes.toString(result.getRow());
int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i, col_a);
Assert.assertEquals(i + 0.0, col_b, 1e-6);
}
Assert.assertEquals(100, i);
table.close();
}
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using UTF-8 Plain Text format
*
* @throws IOException
*/
@Test
public void testStoreToHBase_2() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
scanTable1(pig, DataFormat.HBaseBinary);
pig.store("a", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ TESTCOLUMN_C + "')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = i + "";
String rowKey = new String(result.getRow());
int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));
String col_c = new String(getColValue(result, TESTCOLUMN_C));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i, col_a);
Assert.assertEquals(i + 0.0, col_b, 1e-6);
Assert.assertEquals("Text_" + i, col_c);
}
Assert.assertEquals(100, i);
table.close();
}
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it into
* 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
*
* @throws IOException
*/
@Test
public void testStoreToHBase_2_with_projection() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
scanTable1(pig, DataFormat.HBaseBinary);
pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
pig.store("b", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = i + "";
String rowKey = new String(result.getRow());
int col_a = Integer.parseInt(new String(getColValue(result, TESTCOLUMN_A)));
double col_b = Double.parseDouble(new String(getColValue(result, TESTCOLUMN_B)));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i, col_a);
Assert.assertEquals(i + 0.0, col_b, 1e-6);
}
Assert.assertEquals(100, i);
table.close();
}
/**
* load from hbase 'TESTTABLE_1' using UTF-8 Plain Text format, and store it
* into 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
*
* @throws IOException
*/
@Test
public void testStoreToHBase_3_with_projection_no_caster() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
scanTable1(pig, DataFormat.UTF8PlainText);
pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
pig.store("b", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = i + "";
String rowKey = new String(result.getRow());
String col_a = new String(getColValue(result, TESTCOLUMN_A));
String col_b = new String(getColValue(result, TESTCOLUMN_B));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i + "", col_a);
Assert.assertEquals(i + 0.0 + "", col_b);
}
Assert.assertEquals(100, i);
table.close();
}
public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName)
throws IOException {
Scan scan = new Scan();
byte[] partialKey = generatePartialOutputTableRowKey(ts, indexName);
scan.withStartRow(partialKey);
scan.withStopRow(ByteUtil.calculateTheClosestNextRowKeyForPrefix(partialKey));
ResultScanner scanner = outputTable.getScanner(scan);
return new IndexVerificationOutputRowIterator(scanner.iterator());
}
@Override
public List<Object> getMetricData(String metricId, MetricType metricType, int win, long start, long end, int size) {
long metricIdLong = Long.parseLong(metricId);
byte[] startKey = MetricBaseData.makeKey(metricIdLong, win, start);
byte[] endKey = MetricBaseData.makeKey(metricIdLong, win, end);
HTableInterface table = getHTableInterface(TABLE_METRIC_DATA);
Scan scan = new Scan(startKey, endKey);
//scan.setBatch(10);
scan.setCaching(CACHE_SIZE);
ResultScanner scanner = null;
try {
scanner = getScanner(table, scan);
Iterator<Result> rows = scanner.iterator();
if (rows != null) {
List<Object> ret = new ArrayList<>(size);
while (rows.hasNext()) {
Result row = rows.next();
Object obj = parseMetricDataRow(row, metricType);
if (obj != null) {
ret.add(obj);
}
}
return ret;
}
} catch (Exception ex) {
logger.error("Scan error, metric id:{}, metric type:{}", metricId, metricType, ex);
} finally {
if (scanner != null) {
scanner.close();
}
closeTable(table);
}
return new ArrayList<>(0);
}
public RowIterator(ResultScanner resultScanner) {
this.resultScanner = resultScanner;
this.results = resultScanner.iterator();
}
/**
* Rescan the given range directly from the source and target tables.
* Count and log differences, and if this is not a dry run, output Puts and Deletes
* to make the target table match the source table for this range
*/
private void syncRange(Context context, ImmutableBytesWritable startRow,
ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
Scan scan = sourceTableHash.initScan();
scan.withStartRow(startRow.copyBytes());
scan.withStopRow(stopRow.copyBytes());
ResultScanner sourceScanner = sourceTable.getScanner(scan);
CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
CellScanner targetCells = new CellScanner(targetScanner.iterator());
boolean rangeMatched = true;
byte[] nextSourceRow = sourceCells.nextRow();
byte[] nextTargetRow = targetCells.nextRow();
while(nextSourceRow != null || nextTargetRow != null) {
boolean rowMatched;
int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
if (rowComparison < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow));
}
context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
nextSourceRow = sourceCells.nextRow(); // advance only source to next row
} else if (rowComparison > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow));
}
context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
nextTargetRow = targetCells.nextRow(); // advance only target to next row
} else {
// current row is the same on both sides, compare cell by cell
rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
nextSourceRow = sourceCells.nextRow();
nextTargetRow = targetCells.nextRow();
}
if (!rowMatched) {
rangeMatched = false;
}
}
sourceScanner.close();
targetScanner.close();
context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
.increment(1);
}
/**
* Looks at every value of the mapreduce output and verifies that indeed
* the values have been reversed.
*
* @param table Table to scan.
* @throws IOException
* @throws NullPointerException if we failed to find a cell value
*/
private void verifyAttempt(final Table table)
throws IOException, NullPointerException {
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
scan.addFamily(OUTPUT_FAMILY);
ResultScanner scanner = table.getScanner(scan);
try {
Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +
r.size());
}
}
byte[] firstValue = null;
byte[] secondValue = null;
int count = 0;
for(Cell kv : r.listCells()) {
if (count == 0) {
firstValue = CellUtil.cloneValue(kv);
}else if (count == 1) {
secondValue = CellUtil.cloneValue(kv);
}else if (count == 2) {
break;
}
count++;
}
String first = "";
if (firstValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": first value is null");
}
first = Bytes.toString(firstValue);
String second = "";
if (secondValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": second value is null");
}
byte[] secondReversed = new byte[secondValue.length];
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
secondReversed[i] = secondValue[j];
}
second = Bytes.toString(secondReversed);
if (first.compareTo(second) != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("second key is not the reverse of first. row=" +
Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
", second value=" + second);
}
fail();
}
}
} finally {
scanner.close();
}
}
/**
* Looks at every value of the mapreduce output and verifies that indeed
* the values have been reversed.
* @param table Table to scan.
* @throws IOException
* @throws NullPointerException if we failed to find a cell value
*/
private void verifyAttempt(final Table table) throws IOException, NullPointerException {
Scan scan = new Scan();
TableInputFormat.addColumns(scan, columns);
ResultScanner scanner = table.getScanner(scan);
try {
Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (getLog().isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +
r.size());
}
}
byte[] firstValue = null;
byte[] secondValue = null;
int count = 0;
for(Cell kv : r.listCells()) {
if (count == 0) {
firstValue = CellUtil.cloneValue(kv);
}
if (count == 1) {
secondValue = CellUtil.cloneValue(kv);
}
count++;
if (count == 2) {
break;
}
}
if (firstValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": first value is null");
}
String first = Bytes.toString(firstValue);
if (secondValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": second value is null");
}
byte[] secondReversed = new byte[secondValue.length];
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
secondReversed[i] = secondValue[j];
}
String second = Bytes.toString(secondReversed);
if (first.compareTo(second) != 0) {
if (getLog().isDebugEnabled()) {
getLog().debug("second key is not the reverse of first. row=" +
Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
", second value=" + second);
}
fail();
}
}
} finally {
scanner.close();
}
}