下面列出了org.apache.hadoop.hbase.client.Result#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Map<String, Object> count(String task) {
MappingConfig config = hbaseMapping.get(task);
String hbaseTable = config.getHbaseMapping().getHbaseTable();
long rowCount = 0L;
try {
HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
rowCount += result.size();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Map<String, Object> res = new LinkedHashMap<>();
res.put("hbaseTable", hbaseTable);
res.put("count", rowCount);
return res;
}
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
createTable(thriftAdmin, tableName);
try (Table table = connection.getTable(TableName.valueOf(tableName))){
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
CompareOperator.EQUAL, VALUE_1);
filterList.addFilter(prefixFilter);
filterList.addFilter(columnValueFilter);
Scan scan = new Scan();
scan.readVersions(2);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
assertTrue(iterator.hasNext());
int counter = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
counter += result.size();
}
assertEquals(2, counter);
}
}
/**
* Parse and filter permission based on the specified column family, column qualifier and user
* name.
*/
private static ListMultimap<String, UserPermission> parsePermissions(byte[] entryName,
Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) {
ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
if (result != null && result.size() > 0) {
for (Cell kv : result.rawCells()) {
Pair<String, Permission> permissionsOfUserOnTable =
parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
if (permissionsOfUserOnTable != null) {
String username = permissionsOfUserOnTable.getFirst();
Permission permission = permissionsOfUserOnTable.getSecond();
perms.put(username, new UserPermission(username, permission));
}
}
}
return perms;
}
private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for replication. Row:" + Bytes.toString(row)
+ ". IsDeleteReplication:" + isDeleted);
}
Result res = target.get(get);
boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty();
if (sleep) {
LOG.info("Waiting for more time for replication. Row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
Thread.sleep(SLEEP_TIME);
} else {
if (!isDeleted) {
assertArrayEquals(res.value(), row);
}
LOG.info("Obtained row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
break;
}
}
}
private void ensureRowNotExisted(Table target, byte[] row, byte[]... families)
throws Exception {
for (byte[] fam : families) {
Get get = new Get(row);
get.addFamily(fam);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for delete replication");
}
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
} else {
break;
}
Thread.sleep(10 * SLEEP_TIME);
}
}
}
private void checkWithWait(byte[] row, int count, Table table) throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time while getting the row.");
}
boolean rowReplicated = false;
Result res = table.get(get);
if (res.size() >= 1) {
LOG.info("Row is replicated");
rowReplicated = true;
assertEquals("Table '" + table + "' did not have the expected number of results.",
count, res.size());
break;
}
if (rowReplicated) {
break;
} else {
Thread.sleep(SLEEP_TIME);
}
}
}
/**
* Return the region and current deployment for the region containing the given row. If the region
* cannot be found, returns null. If it is found, but not currently deployed, the second element
* of the pair may be null.
*/
private Pair<RegionInfo, ServerName> getTableRegionForRow(HMaster master, TableName tableName,
byte[] rowKey) throws IOException {
final AtomicReference<Pair<RegionInfo, ServerName>> result = new AtomicReference<>(null);
ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
@Override
public boolean visit(Result data) throws IOException {
if (data == null || data.size() <= 0) {
return true;
}
Pair<RegionInfo, ServerName> pair = new Pair<>(CatalogFamilyFormat.getRegionInfo(data),
CatalogFamilyFormat.getServerName(data, 0));
if (!pair.getFirst().getTable().equals(tableName)) {
return false;
}
result.set(pair);
return true;
}
};
MetaTableAccessor.scanMeta(master.getConnection(), visitor, tableName, rowKey, 1);
return result.get();
}
@Override
public Map<String, Object> count(String task) {
MappingConfig config = hbaseMapping.get(task);
String hbaseTable = config.getHbaseMapping().getHbaseTable();
long rowCount = 0L;
try {
HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
rowCount += result.size();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Map<String, Object> res = new LinkedHashMap<>();
res.put("hbaseTable", hbaseTable);
res.put("count", rowCount);
return res;
}
/**
* Implements mapper logic for use across APIs.
*/
protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
if (value.size() != 1) {
throw new IOException("There should only be one input column");
}
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
cf = value.getMap();
if(!cf.containsKey(INPUT_FAMILY)) {
throw new IOException("Wrong input columns. Missing: '" +
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
Put outval = new Put(key.get());
outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
return outval;
}
/**
* Pass the key, and reversed value to reduce
*
* @param key
* @param value
* @param context
* @throws IOException
*/
@Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
if (value.size() != 1) {
throw new IOException("There should only be one input column");
}
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
cf = value.getMap();
if(!cf.containsKey(INPUT_FAMILY)) {
throw new IOException("Wrong input columns. Missing: '" +
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
Put outval = new Put(key.get());
outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
context.write(key, outval);
}
@Override
protected boolean doAction() throws Exception {
ResultScanner rs = null;
try {
Scan s = new Scan();
s.setBatch(2);
s.addFamily(FAMILY);
s.setFilter(new KeyOnlyFilter());
s.readVersions(1);
rs = table.getScanner(s);
Result result = rs.next();
return result != null && result.size() > 0;
} finally {
if (rs != null) {
rs.close();
}
}
}
@Override
public List<AcceptApplication> mapRow(Result result, int rowNum) throws Exception {
if (result.isEmpty()) {
return Collections.emptyList();
}
// readRowKey(result.getRow());
final List<AcceptApplication> acceptApplicationList = new ArrayList<>(result.size());
for (Cell cell : result.rawCells()) {
AcceptApplication acceptedApplication = createAcceptedApplication(cell);
acceptApplicationList.add(acceptedApplication);
}
return acceptApplicationList;
}
/**
* Count the number of keyvalues in the specified table with the given filter
* @param table the table to scan
* @return the number of keyvalues found
* @throws IOException
*/
private int getCount(Table table, Filter filter) throws IOException {
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
count += res.size();
}
results.close();
return count;
}
private byte[] getMergedStats(final Result result, final boolean clientsideStatsMerge) {
if (!clientsideStatsMerge || (result.size() == 1)) {
return result.value();
}
return URLClassloaderUtils.toBinary(HBaseUtils.getMergedStats(result.listCells()));
}
/**
* Convert a client Result to a protocol buffer Result.
* The pb Result does not include the Cell data. That is for transport otherwise.
*
* @param result the client Result to convert
* @return the converted protocol buffer Result
*/
public static ClientProtos.Result toResultNoData(final Result result) {
if (result.getExists() != null) return toResult(result.getExists(), result.isStale());
int size = result.size();
if (size == 0) return result.isStale() ? EMPTY_RESULT_PB_STALE : EMPTY_RESULT_PB;
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
builder.setStale(result.isStale());
return builder.build();
}
private static ExportProtos.ExportResponse processData(final Region region,
final Configuration conf, final UserProvider userProvider, final Scan scan,
final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
ScanCoprocessor cp = new ScanCoprocessor(region);
RegionScanner scanner = null;
try (RegionOp regionOp = new RegionOp(region);
SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
scanner = cp.checkScannerOpen(scan);
ImmutableBytesWritable key = new ImmutableBytesWritable();
long rowCount = 0;
long cellCount = 0;
List<Result> results = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
do {
boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
if (bypass) {
hasMore = false;
} else {
hasMore = scanner.nextRaw(cells);
if (cells.isEmpty()) {
continue;
}
Cell firstCell = cells.get(0);
for (Cell cell : cells) {
if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()) != 0) {
throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
+ " rows?? first row="
+ Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
firstCell.getRowLength())
+ ", current row="
+ Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
}
results.add(Result.create(cells));
cells.clear();
cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
}
for (Result r : results) {
key.set(r.getRow());
out.append(key, r);
++rowCount;
cellCount += r.size();
}
results.clear();
} while (hasMore);
return ExportProtos.ExportResponse.newBuilder()
.setRowCount(rowCount)
.setCellCount(cellCount)
.build();
} finally {
cp.checkScannerClose(scanner);
}
}
private void countOutputBytes(Result r){
if(r==null || r.size()<=0) return;
//TODO -sf- count the cell bytes
}
/**
* Looks at every value of the mapreduce output and verifies that indeed
* the values have been reversed.
*
* @param table Table to scan.
* @throws IOException
* @throws NullPointerException if we failed to find a cell value
*/
private void verifyAttempt(final Table table)
throws IOException, NullPointerException {
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
scan.addFamily(OUTPUT_FAMILY);
ResultScanner scanner = table.getScanner(scan);
try {
Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +
r.size());
}
}
byte[] firstValue = null;
byte[] secondValue = null;
int count = 0;
for(Cell kv : r.listCells()) {
if (count == 0) {
firstValue = CellUtil.cloneValue(kv);
}else if (count == 1) {
secondValue = CellUtil.cloneValue(kv);
}else if (count == 2) {
break;
}
count++;
}
String first = "";
if (firstValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": first value is null");
}
first = Bytes.toString(firstValue);
String second = "";
if (secondValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": second value is null");
}
byte[] secondReversed = new byte[secondValue.length];
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
secondReversed[i] = secondValue[j];
}
second = Bytes.toString(secondReversed);
if (first.compareTo(second) != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("second key is not the reverse of first. row=" +
Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
", second value=" + second);
}
fail();
}
}
} finally {
scanner.close();
}
}
/**
* creates a list of appkeys from the hbase scan
* @param scan
* @param startTime
* @param endTime
* @param maxCount
* @return list of flow keys
* @throws IOException
*/
public List<AppKey> createNewAppKeysFromResults(Scan scan, long startTime,
long endTime, int maxCount) throws IOException {
ResultScanner scanner = null;
List<AppKey> newAppsKeys = new ArrayList<AppKey>();
Table versionsTable = null;
try {
Stopwatch timer = new Stopwatch().start();
int rowCount = 0;
long colCount = 0;
long resultSize = 0;
versionsTable = hbaseConnection
.getTable(TableName.valueOf(Constants.HISTORY_APP_VERSION_TABLE));
scanner = versionsTable.getScanner(scan);
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
colCount += result.size();
// TODO dogpiledays resultSize += result.getWritableSize();
AppKey appKey = getNewAppKeyFromResult(result, startTime, endTime);
if (appKey != null) {
newAppsKeys.add(appKey);
}
if (newAppsKeys.size() >= maxCount) {
break;
}
}
}
timer.stop();
LOG.info(" Fetched from hbase " + rowCount + " rows, " + colCount
+ " columns, " + resultSize + " bytes ( " + resultSize / (1024 * 1024)
+ ") MB, in total time of " + timer);
} finally {
if (scanner != null) {
scanner.close();
}
if (versionsTable != null) {
versionsTable.close();
}
}
return newAppsKeys;
}
/**
* Returns a list of {@link Flow} instances generated from the given results.
* For the moment, this assumes that the given scanner provides results
* ordered first by flow ID.
*
* @param scan the Scan instance setup for retrieval
* @return
*/
private List<Flow> createFromResults(Scan scan, boolean populateTasks,
int maxCount) throws IOException {
List<Flow> flows = new ArrayList<Flow>();
ResultScanner scanner = null;
try {
Stopwatch timer = new Stopwatch().start();
Stopwatch timerJob = new Stopwatch();
int rowCount = 0;
long colCount = 0;
long resultSize = 0;
int jobCount = 0;
Table historyTable =
hbaseConnection.getTable(TableName.valueOf(Constants.HISTORY_TABLE));
scanner = historyTable.getScanner(scan);
Flow currentFlow = null;
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
colCount += result.size();
// TODO dogpiledays resultSize += result.getWritableSize();
JobKey currentKey = jobKeyConv.fromBytes(result.getRow());
// empty runId is special cased -- we need to treat each job as it's
// own flow
if (currentFlow == null || !currentFlow.contains(currentKey)
|| currentKey.getRunId() == 0) {
// return if we've already hit the limit
if (flows.size() >= maxCount) {
break;
}
currentFlow = new Flow(new FlowKey(currentKey));
flows.add(currentFlow);
}
timerJob.start();
JobDetails job = new JobDetails(currentKey);
job.populate(result);
currentFlow.addJob(job);
jobCount++;
timerJob.stop();
}
}
historyTable.close();
timer.stop();
LOG.info("Fetched from hbase " + rowCount + " rows, " + colCount
+ " columns, " + flows.size() + " flows and " + jobCount
+ " jobs taking up " + resultSize + " bytes ( "
+ resultSize / (1024.0 * 1024.0) + " atomic double: "
+ new AtomicDouble(resultSize / (1024.0 * 1024.0))
+ ") MB, in total time of " + timer + " with " + timerJob
+ " spent inJobDetails & Flow population");
// export the size of data fetched from hbase as a metric
HravenResponseMetrics.FLOW_HBASE_RESULT_SIZE_VALUE
.set(resultSize / (1024.0 * 1024.0));
} finally {
if (scanner != null) {
scanner.close();
}
}
if (populateTasks) {
populateTasks(flows);
}
return flows;
}