下面列出了怎么用org.apache.hadoop.hbase.protobuf.ResponseConverter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void queryByStartRowAndEndRow(RpcController controller, DataProtos.DataQueryRequest request, RpcCallback<DataQueryResponse> done) {
DataProtos.DataQueryResponse response = null;
InternalScanner scanner = null;
try {
String startRow = request.getStartRow();
String endRow = request.getEndRow();
String regionStartKey = Bytes.toString(this.env.getRegion().getRegionInfo().getStartKey());
String regionEndKey = Bytes.toString(this.env.getRegion().getRegionInfo().getEndKey());
if (request.getIsSalting()) { // 如果加盐过则在key前添加盐值
String startSalt = null;
String endSalt = null;
if (StrUtils.isNotEmpty(regionStartKey)) {
startSalt = regionStartKey.split("_")[0]; // 加盐的方式为盐值+"_",所以取_前面的
}
if (StrUtils.isNotEmpty(regionEndKey)) {
endSalt = regionStartKey.split("_")[0]; //加盐的方式为盐值+"_",所以取_前面的
}
if (startSalt != null) {
if (null != startRow) {
startRow = startSalt + "_" + startRow;
endRow = endSalt + "_" + endRow;
}
}
}
Scan scan = new Scan();
if (null != startRow) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if (null != endRow) {
if (request.getIncluedEnd()) {
Filter filter = new InclusiveStopFilter(Bytes.toBytes(endRow));
scan.setFilter(filter);
} else {
scan.setStopRow(Bytes.toBytes(endRow));
}
}
scanner = this.env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore;
DataProtos.DataQueryResponse.Builder responseBuilder = DataProtos.DataQueryResponse.newBuilder();
do {
hasMore = scanner.next(results);
DataProtos.DataQueryResponse.Row.Builder rowBuilder = DataProtos.DataQueryResponse.Row.newBuilder();
if (results.size() > 0) {
rowBuilder.setRowKey(ByteString.copyFrom(results.get(0).getRow()));
for (Cell kv : results) {
queryBuilder(rowBuilder, ByteString.copyFrom(kv.getFamily()), ByteString.copyFrom(kv.getQualifier()), ByteString.copyFrom(kv.getRow()), ByteString.copyFrom(kv.getValue()));
}
}
responseBuilder.addRowList(rowBuilder);
results.clear();
} while (hasMore);
response = responseBuilder.build();
} catch (IOException ignored) {
ResponseConverter.setControllerException(controller, ignored);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException e) {
_log.error(ExceptionUtils.errorInfo(e));
}
}
}
done.run(response);
}
@Override
public void queryByRowKey(RpcController controller, DataProtos.DataQueryRequest request, RpcCallback<DataQueryResponse> done) {
DataProtos.DataQueryResponse response = null;
try {
String rowKey = request.getRowKey();
String regionStartKey = Bytes.toString(this.env.getRegion().getRegionInfo().getStartKey());
if (request.getIsSalting()) { // 如果加盐过则在key前添加盐值
String startSalt = null;
if (null != regionStartKey && !regionStartKey.isEmpty()) {
startSalt = regionStartKey.split("_")[0]; // 加盐的方式为盐值+"_",所以取_前面的
}
if (null != startSalt) {
if (null != rowKey) {
rowKey = startSalt + "_" + rowKey;
}
}
}
if (StrUtils.isEmpty(rowKey)) return;
Get get = new Get(Bytes.toBytes(rowKey));
Result result = this.env.getRegion().get(get);
DataProtos.DataQueryResponse.Builder responseBuilder = DataProtos.DataQueryResponse.newBuilder();
DataProtos.DataQueryResponse.Row.Builder rowBuilder = DataProtos.DataQueryResponse.Row.newBuilder();
if (result != null && !result.isEmpty()) {
List<KeyValue> list = result.list();
if (null != list && !list.isEmpty()) {
rowBuilder.setRowKey(ByteString.copyFrom(list.get(0).getRow()));
for (KeyValue kv : list) {
queryBuilder(rowBuilder, ByteString.copyFrom(kv.getFamily()), ByteString.copyFrom(kv.getQualifier()), ByteString.copyFrom(kv.getRow()), ByteString.copyFrom(kv.getValue()));
}
}
}
responseBuilder.addRowList(rowBuilder);
response = responseBuilder.build();
} catch (IOException ignored) {
ResponseConverter.setControllerException(controller, ignored);
}
done.run(response);
}
@Override
public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse> done) {
CoprocessorRowType type;
CoprocessorProjector projector;
EndpointAggregators aggregators;
CoprocessorFilter filter;
type = CoprocessorRowType.deserialize(request.getType().toByteArray());
projector = CoprocessorProjector.deserialize(request.getProjector().toByteArray());
aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray());
filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray());
TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest();
IIProtos.IIResponse response = null;
RegionScanner innerScanner = null;
HRegion region = null;
try {
region = env.getRegion();
innerScanner = region.getScanner(buildScan());
region.startRegionOperation();
synchronized (innerScanner) {
IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfoDigest);
//TODO pass projector to codec to skip loading columns
Iterable<Slice> slices = codec.decodeKeyValue(new HbaseServerKVIterator(innerScanner));
if (aggregators.isEmpty()) {
response = getNonAggregatedResponse(slices, filter, type);
} else {
response = getAggregatedResponse(slices, filter, type, projector, aggregators);
}
}
} catch (IOException ioe) {
System.out.println(ioe.toString());
ResponseConverter.setControllerException(controller, ioe);
} finally {
IOUtils.closeQuietly(innerScanner);
if (region != null) {
try {
region.closeRegionOperation();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
done.run(response);
}