下面列出了怎么用org.apache.hadoop.hbase.filter.MultiRowRangeFilter的API类实例代码及写法,或者点击链接到github查看源代码。
public static Scan mergeRangeScans(List<Scan> rangeScans) {
List<RowRange> ranges = Lists.newArrayList();
for (Scan rangeScan : rangeScans) {
byte[] startRow = rangeScan.getStartRow();
byte[] stopRow = rangeScan.getStopRow();
ranges.add(new RowRange(startRow, true, stopRow, false));
}
Scan mergedScan = new Scan();
try {
mergedScan.setFilter(new MultiRowRangeFilter(ranges));
} catch (IOException e) {
throw new RuntimeException(e);
}
return mergedScan;
}
@Test
public void testMergePrefixScans() throws IOException {
List<Scan> scans = Lists.newArrayList();
byte[] startRow1 = Bytes.toBytes("hello");
byte[] stopRow1 = Bytes.toBytes("hellp");
Scan scan1 = new Scan(startRow1, stopRow1);
scans.add(scan1);
byte[] startRow2 = Bytes.toBytes("world");
byte[] stopRow2 = Bytes.toBytes("worle");
Scan scan2 = new Scan(startRow2, stopRow2);
scans.add(scan2);
Scan merged = HBaseUtils.mergeRangeScans(scans);
assertEquals(MultiRowRangeFilter.class, merged.getFilter().getClass());
MultiRowRangeFilter mergedFilter = (MultiRowRangeFilter)merged.getFilter();
List<RowRange> ranges = mergedFilter.getRowRanges();
assertEquals(2, ranges.size());
assertTrue(ranges.get(0).getStartRow().equals(startRow1));
assertTrue(ranges.get(0).getStopRow().equals(stopRow1));
assertTrue(ranges.get(1).getStartRow().equals(startRow2));
assertTrue(ranges.get(1).getStopRow().equals(stopRow2));
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new Exception("Table name not specified.");
}
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, args[0]);
Scan scan = new Scan();
List<RowKeyRange> ranges = new ArrayList<RowKeyRange>();
ranges.add(new RowKeyRange(Bytes.toBytes("001"), Bytes.toBytes("002")));
ranges.add(new RowKeyRange(Bytes.toBytes("003"), Bytes.toBytes("004")));
ranges.add(new RowKeyRange(Bytes.toBytes("005"), Bytes.toBytes("006")));
Filter filter = new MultiRowRangeFilter(ranges);
scan.setFilter(filter);
int count = 0;
ResultScanner scanner = table.getScanner(scan);
Result r = scanner.next();
while (r != null) {
count++;
r = scanner.next();
}
System.out
.println("++ Scanning finished with count : " + count + " ++");
scanner.close();
}
private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) {
final List<String> rangesSplit = Splitter.on(";").splitToList(arg);
final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
for (String range : rangesSplit) {
if(range!=null && !range.isEmpty()) {
List<String> startEnd = Splitter.on(",").splitToList(range);
if (startEnd.size() != 2 || startEnd.get(1).contains(",")) {
throw new IllegalArgumentException("Wrong range specification: " + range);
}
String startKey = startEnd.get(0);
String endKey = startEnd.get(1);
rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey),
true, Bytes.toBytesBinary(endKey), false));
}
}
return rangeList;
}
protected Scan getMultiScanner(final FilterList filterList) {
// Single scan w/ multiple ranges
final Scan multiScanner = scanProvider.get();
final List<ByteArrayRange> ranges = readerParams.getQueryRanges().getCompositeQueryRanges();
final MultiRowRangeFilter filter = operations.getMultiRowRangeFilter(ranges);
if (filter != null) {
filterList.addFilter(filter);
final List<RowRange> rowRanges = filter.getRowRanges();
multiScanner.setStartRow(rowRanges.get(0).getStartRow());
final RowRange stopRowRange = rowRanges.get(rowRanges.size() - 1);
byte[] stopRowExclusive;
if (stopRowRange.isStopRowInclusive()) {
// because the end is always exclusive, to make an inclusive
// stop row into exlusive all we need to do is add a traling 0
stopRowExclusive = HBaseUtils.getInclusiveEndKey(stopRowRange.getStopRow());
} else {
stopRowExclusive = stopRowRange.getStopRow();
}
multiScanner.setStopRow(stopRowExclusive);
}
return multiScanner;
}
public static void readRowRanges(String projectId, String instanceId, String tableId) {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)) {
Table table = connection.getTable(TableName.valueOf(tableId));
List<RowRange> ranges = new ArrayList<>();
ranges.add(
new RowRange(
Bytes.toBytes("phone#4c410523#20190501"),
true,
Bytes.toBytes("phone#4c410523#20190601"),
false));
ranges.add(
new RowRange(
Bytes.toBytes("phone#5c10102#20190501"),
true,
Bytes.toBytes("phone#5c10102#20190601"),
false));
Filter filter = new MultiRowRangeFilter(ranges);
Scan scan = new Scan().setFilter(filter);
ResultScanner rows = table.getScanner(scan);
for (Result row : rows) {
printRow(row);
}
} catch (IOException e) {
System.out.println(
"Unable to initialize service client, as a network error occurred: \n" + e.toString());
}
}
/**
* Sets filter {@link FilterBase} to the {@link Scan} instance.
* If provided rowRangeList contains more than one element,
* method sets filter which is instance of {@link MultiRowRangeFilter}.
* Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
* If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
* @param scan
* @param rowRangeList
*/
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
final int size = rowRangeList == null ? 0 : rowRangeList.size();
if (size <= 1) {
scan.setFilter(new FirstKeyOnlyFilter());
}
if (size == 1) {
MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
scan.withStartRow(range.getStartRow()); //inclusive
scan.withStopRow(range.getStopRow()); //exclusive
} else if (size > 1) {
scan.setFilter(new MultiRowRangeFilter(rowRangeList));
}
}
public MultiRowRangeFilter getMultiRowRangeFilter(final List<ByteArrayRange> ranges) {
// create the multi-row filter
final List<RowRange> rowRanges = new ArrayList<>();
if ((ranges == null) || ranges.isEmpty()) {
rowRanges.add(
new RowRange(HConstants.EMPTY_BYTE_ARRAY, true, HConstants.EMPTY_BYTE_ARRAY, false));
} else {
for (final ByteArrayRange range : ranges) {
if (range.getStart() != null) {
final byte[] startRow = range.getStart();
byte[] stopRow;
if (!range.isSingleValue()) {
stopRow = range.getEndAsNextPrefix();
} else {
stopRow = ByteArrayUtils.getNextPrefix(range.getStart());
}
final RowRange rowRange = new RowRange(startRow, true, stopRow, false);
rowRanges.add(rowRange);
}
}
}
// Create the multi-range filter
try {
return new MultiRowRangeFilter(rowRanges);
} catch (final IOException e) {
LOGGER.error("Error creating range filter.", e);
}
return null;
}
@Override
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String target = cmd.getOptionValue('t');
String targetGraph = cmd.getOptionValue('g');
String graphContext = cmd.getOptionValue('c');
String thresh = cmd.getOptionValue('r');
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardStats " + source + (target == null ? " update" : " -> " + target));
job.getConfiguration().set(SOURCE, source);
if (target != null) job.getConfiguration().set(TARGET, target);
if (targetGraph != null) job.getConfiguration().set(TARGET_GRAPH, targetGraph);
if (graphContext != null) job.getConfiguration().set(GRAPH_CONTEXT, graphContext);
if (thresh != null) job.getConfiguration().setLong(THRESHOLD, Long.parseLong(thresh));
job.setJarByClass(HalyardStats.class);
TableMapReduceUtil.initCredentials(job);
Scan scan = HalyardTableUtils.scan(null, null);
if (graphContext != null) { //restricting stats to scan given graph context only
List<RowRange> ranges = new ArrayList<>();
byte[] gcHash = HalyardTableUtils.hashKey(SimpleValueFactory.getInstance().createIRI(graphContext));
ranges.add(rowRange(HalyardTableUtils.CSPO_PREFIX, gcHash));
ranges.add(rowRange(HalyardTableUtils.CPOS_PREFIX, gcHash));
ranges.add(rowRange(HalyardTableUtils.COSP_PREFIX, gcHash));
if (target == null) { //add stats context to the scanned row ranges (when in update mode) to delete the related stats during MapReduce
ranges.add(rowRange(HalyardTableUtils.CSPO_PREFIX, HalyardTableUtils.hashKey(targetGraph == null ? HALYARD.STATS_GRAPH_CONTEXT : SimpleValueFactory.getInstance().createIRI(targetGraph))));
}
scan.setFilter(new MultiRowRangeFilter(ranges));
}
TableMapReduceUtil.initTableMapperJob(
source,
scan,
StatsMapper.class,
ImmutableBytesWritable.class,
LongWritable.class,
job);
job.setPartitionerClass(StatsPartitioner.class);
job.setReducerClass(StatsReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
if (job.waitForCompletion(true)) {
LOG.info("Stats Generation Completed..");
return 0;
}
return -1;
}
@ProcessElement
public void processElement(PipelineOptions po) {
// Determine which column will be drawn based on runtime of job.
long timestampDiff = System.currentTimeMillis() - START_TIME;
long minutes = (timestampDiff / 1000) / 60;
int timeOffsetIndex = Math.toIntExact(minutes / KEY_VIZ_WINDOW_MINUTES);
ReadDataOptions options = po.as(ReadDataOptions.class);
long count = 0;
List<RowRange> ranges = getRangesForTimeIndex(timeOffsetIndex, getNumRows(options));
if (ranges.size() == 0) {
return;
}
try {
// Scan with a filter that will only return the first key from each row. This filter is used
// to more efficiently perform row count operations.
Filter rangeFilters = new MultiRowRangeFilter(ranges);
FilterList firstKeyFilterWithRanges = new FilterList(
rangeFilters,
new FirstKeyOnlyFilter(),
new KeyOnlyFilter());
Scan scan =
new Scan()
.addFamily(Bytes.toBytes(COLUMN_FAMILY))
.setFilter(firstKeyFilterWithRanges);
Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
ResultScanner imageData = table.getScanner(scan);
// Iterate over stream of rows to count them.
for (Result row : imageData) {
count++;
}
} catch (Exception e) {
System.out.println("Error reading.");
e.printStackTrace();
}
System.out.printf("got %d rows\n", count);
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
* @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead.
*/
@Deprecated
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
long startTime = 0;
long endTime = 0;
StringBuilder sb = new StringBuilder();
final String rangeSwitch = "--range=";
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
final String expectedCountArg = "--expected-count=";
// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
if (args[i].startsWith(rangeSwitch)) {
try {
rowRangeList = parseRowRangeParameter(
args[i].substring(args[1].indexOf(rangeSwitch)+rangeSwitch.length()));
} catch (IllegalArgumentException e) {
return null;
}
continue;
}
if (args[i].startsWith(startTimeArgKey)) {
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
continue;
}
if (args[i].startsWith(endTimeArgKey)) {
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
continue;
}
if (args[i].startsWith(expectedCountArg)) {
conf.setLong(EXPECTED_COUNT_KEY,
Long.parseLong(args[i].substring(expectedCountArg.length())));
continue;
}
// if no switch, assume column names
sb.append(args[i]);
sb.append(" ");
}
if (endTime < startTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
return null;
}
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
scan.setCacheBlocks(false);
setScanFilter(scan, rowRangeList);
if (sb.length() > 0) {
for (String columnName : sb.toString().trim().split(" ")) {
String family = StringUtils.substringBefore(columnName, ":");
String qualifier = StringUtils.substringAfter(columnName, ":");
if (StringUtils.isBlank(qualifier)) {
scan.addFamily(Bytes.toBytes(family));
}
else {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}
}
}
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(0);
return job;
}