下面列出了怎么用org.apache.hadoop.hbase.mapreduce.TableSplit的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Exec the HbaseSplit for a query against an Hbase table.
* <p>
* Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
* all sorts of sweet optimizations. What you have here is an important method.
*
* @param session Current session
* @param split HbaseSplit
* @param columnHandles List of HbaseColumnHandle
* @return RecordReader<ImmutableBytesWritable , Result> for {@link org.apache.hadoop.mapreduce.RecordReader}
*/
public RecordReader<ImmutableBytesWritable, Result> execSplit(ConnectorSession session, HbaseSplit split, List<HbaseColumnHandle> columnHandles)
throws IllegalAccessException, NoSuchFieldException, IOException, InterruptedException
{
TableName tableName = TableName.valueOf(split.getSchema(), split.getTable());
Scan scan = TabletSplitMetadata.convertStringToScan(split.getSplitMetadata().getScan());
buildScan(scan, session, columnHandles);
TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
tableInputFormat.setScan(scan);
RecordReader<ImmutableBytesWritable, Result> resultRecordReader = tableInputFormat.createRecordReader(new TableSplit(
TableName.valueOf(split.getSplitMetadata().getTableName()),
scan,
split.getSplitMetadata().getStartRow(),
split.getSplitMetadata().getEndRow(),
split.getSplitMetadata().getRegionLocation(),
split.getSplitMetadata().getLength()
), null);
resultRecordReader.initialize(null, null);
return resultRecordReader;
}
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
List<InputSplit> splits = super.getSplits(context);
ListIterator<InputSplit> splitIter = splits.listIterator();
while (splitIter.hasNext()) {
TableSplit split = (TableSplit) splitIter.next();
byte[] startKey = split.getStartRow();
byte[] endKey = split.getEndRow();
// Skip if the region doesn't satisfy configured options.
if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
(skipRegion(CompareOp.GREATER, endKey, gt_)) ||
(skipRegion(CompareOp.GREATER, endKey, gte_)) ||
(skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) ) {
splitIter.remove();
}
}
return splits;
}
@Test
public void completeMemstoreScan() throws Exception {
List<String> names = new ArrayList<String>();
names.add("COL1");
names.add("COL2");
config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".A", names).base64Encode());
SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".A"));
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
rr.setHTable(table);
rr.setScan(scan);
SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
rr.initialize(tableSplit, null);
int i = 0;
while (rr.nextKeyValue()) {
i++;
Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
}
Assert.assertEquals("incorrect results returned", 1000, i);
}
}
@Test
public void emptyMemstoreScan() throws Exception {
List<String> names = new ArrayList<String>();
names.add("COL1");
names.add("COL2");
config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".D", names).base64Encode());
SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".D"));
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
rr.setHTable(table);
rr.setScan(scan);
SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
rr.initialize(tableSplit, null);
int i = 0;
while (rr.nextKeyValue()) {
i++;
Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
}
Assert.assertEquals("incorrect results returned", 1000, i);
}
}
@Test
public void singleRegionScanWithOneStoreFileAndMemstore() throws Exception {
List<String> names = new ArrayList<String>();
names.add("COL1");
names.add("COL2");
config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".B", names).base64Encode());
SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".B"));
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
rr.setHTable(table);
rr.setScan(scan);
SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
rr.initialize(tableSplit, null);
int i = 0;
while (rr.nextKeyValue()) {
i++;
Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
}
Assert.assertEquals("incorrect results returned", 1000, i);
}
}
@Test
public void twoRegionsWithMemstores() throws Exception {
List<String> names = new ArrayList<String>();
names.add("COL1");
names.add("COL2");
config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".C", names).base64Encode());
SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".C"));
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
rr.setHTable(table);
rr.setScan(scan);
SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
rr.initialize(tableSplit, null);
int i = 0;
while (rr.nextKeyValue()) {
i++;
Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
}
Assert.assertEquals("incorrect results returned", 10000, i);
}
}
@Test
public void testScanAfterMajorCompaction() throws Exception {
List<String> names = new ArrayList<String>();
names.add("COL1");
names.add("COL2");
config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".E", names).base64Encode());
SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".E"));
try (Connection connection = ConnectionFactory.createConnection(config)) {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
rr.setHTable(table);
rr.setScan(scan);
SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
rr.initialize(tableSplit, null);
int i = 0;
while (rr.nextKeyValue()) {
i++;
Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
}
Assert.assertEquals("incorrect results returned", 5000, i);
}
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
if (isMock()) {
if (table == null) {
initialize(context);
}
List<InputSplit> splits = new ArrayList<>(1);
TableSplit split = new TableSplit(getTable().getName(), getScan(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "", 0);
splits.add(split);
return splits;
} else {
return super.getSplits(context);
}
}
@Override
public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
if (split instanceof TableSplit) {
return new TableSplitComparable((TableSplit) split);
} else {
throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
}
}
@Override
public void initialize() {
List<Partition> partitions = Arrays.asList(((SparkDataSet) dataSet).rdd.rdd().partitions());
tableSplits = new ArrayList<>(partitions.size());
for (Partition p : partitions) {
NewHadoopPartition nhp = (NewHadoopPartition) p;
SMSplit sms = (SMSplit) nhp.serializableHadoopSplit().value();
TableSplit ts = sms.getSplit();
if (ts.getStartRow() != null && Bytes.equals(ts.getStartRow(),ts.getEndRow()) && ts.getStartRow().length > 0) {
// this would be an empty partition, with the same start and end key, so don't add it
continue;
}
tableSplits.add(ts);
}
}
private List<InputSplit> toSMSplits (List<Partition> splits) throws IOException {
List<InputSplit> sMSplits = Lists.newArrayList();
HBaseTableInfoFactory infoFactory = HBaseTableInfoFactory.getInstance(HConfiguration.getConfiguration());
for(Partition split:splits) {
SMSplit smSplit = new SMSplit(
new TableSplit(
infoFactory.getTableInfo(split.getTableName()),
split.getStartKey(),
split.getEndKey(),
split.owningServer().getHostname()));
sMSplits.add(smSplit);
}
return sMSplits;
}
/**
* Fetches the TabletSplitMetadata for a query against an Hbase table.
* <p>
* Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
* all sorts of sweet optimizations. What you have here is an important method.
*
* @param session Current session
* @param schema Schema name
* @param table Table Name
* @param rowIdDomain Domain for the row ID
* @param constraints Column constraints for the query
* @return List of TabletSplitMetadata objects for Presto
*/
public List<TabletSplitMetadata> getTabletSplits(
ConnectorSession session,
String schema,
String table,
Optional<Domain> rowIdDomain,
List<HbaseColumnConstraint> constraints) //HbaseRowSerializer serializer
{
try {
TableName tableName = TableName.valueOf(schema, table);
LOG.debug("Getting tablet splits for table %s", tableName);
// Get the initial Range based on the row ID domain
Collection<Range> rowIdRanges = getRangesFromDomain(rowIdDomain); //serializer
// Split the ranges on tablet boundaries, if enabled
// Create TabletSplitMetadata objects for each range
boolean fetchTabletLocations = HbaseSessionProperties.isOptimizeLocalityEnabled(session);
LOG.debug("Fetching tablet locations: %s", fetchTabletLocations);
ImmutableList.Builder<TabletSplitMetadata> builder = ImmutableList.builder();
if (rowIdRanges.size() == 0) { //无 rowkey过滤
LOG.warn("This request has no rowkey filter");
}
List<Scan> rowIdScans = rowIdRanges.size() == 0 ?
Arrays.asList(new Scan())
: rowIdRanges.stream().map(HbaseClient::getScanFromPrestoRange).collect(Collectors.toList());
for (Scan scan : rowIdScans) {
TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
tableInputFormat.setConf(connection.getConfiguration());
tableInputFormat.setScan(scan);
JobContext context = new JobContextImpl(new JobConf(), null);
List<TableSplit> splits = tableInputFormat.getSplits(context)
.stream().map(x -> (TableSplit) x).collect(Collectors.toList());
for (TableSplit split : splits) {
TabletSplitMetadata metadata = new TabletSplitMetadata(
split.getTable().getName(),
split.getStartRow(),
split.getEndRow(),
TabletSplitMetadata.convertScanToString(split.getScan()),
split.getRegionLocation(),
split.getLength());
builder.add(metadata);
}
}
List<TabletSplitMetadata> tabletSplits = builder.build();
// Log some fun stuff and return the tablet splits
LOG.debug("Number of splits for table %s is %d with %d ranges", tableName, tabletSplits.size(), rowIdRanges.size());
return tabletSplits;
}
catch (Exception e) {
throw new PrestoException(UNEXPECTED_HBASE_ERROR, "Failed to get splits from Hbase", e);
}
}
public TableSplitComparable() {
tsplit = new TableSplit();
}
public TableSplitComparable(TableSplit tsplit) {
this.tsplit = tsplit;
}
@Override
public int compareTo(TableSplit split) {
return tsplit.compareTo((TableSplit) split);
}
public TableSplit getSplit() {
return this.split.getSplit();
}
public void init(Configuration config, InputSplit split) throws IOException, InterruptedException {
if (LOG.isDebugEnabled())
SpliceLogUtils.debug(LOG, "init");
if (TaskContext.get() != null) {
TaskContext.get().addTaskFailureListener(this);
}
String tableScannerAsString = config.get(MRConstants.SPLICE_SCAN_INFO);
if (tableScannerAsString == null)
throw new IOException("splice scan info was not serialized to task, failing");
byte[] scanStartKey = null;
byte[] scanStopKey = null;
try {
builder = TableScannerBuilder.getTableScannerBuilderFromBase64String(tableScannerAsString);
if (LOG.isTraceEnabled())
SpliceLogUtils.trace(LOG, "config loaded builder=%s", builder);
TableSplit tSplit = ((SMSplit) split).getSplit();
token = builder.getToken();
DataScan scan = builder.getScan();
scanStartKey = scan.getStartKey();
scanStopKey = scan.getStopKey();
if (Bytes.startComparator.compare(scanStartKey, tSplit.getStartRow()) < 0) {
// the split itself is more restrictive
scan.startKey(tSplit.getStartRow());
}
if (Bytes.endComparator.compare(scanStopKey, tSplit.getEndRow()) > 0) {
// the split itself is more restrictive
scan.stopKey(tSplit.getEndRow());
}
setScan(((HScan) scan).unwrapDelegate());
// TODO (wjk): this seems weird (added with DB-4483)
this.statisticsRun = AbstractSMInputFormat.oneSplitPerRegion(config);
Double sampling = AbstractSMInputFormat.sampling(config);
if (sampling != null) {
this.sampling = true;
this.samplingRate = sampling;
}
restart(scan.getStartKey());
} catch (IOException ioe) {
LOG.error(String.format("Received exception with scan %s, original start key %s, original stop key %s, split %s",
scan, Bytes.toStringBinary(scanStartKey), Bytes.toStringBinary(scanStopKey), split), ioe);
throw ioe;
} catch (StandardException e) {
throw new IOException(e);
}
}
public SMSplit() throws IOException{
super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0,null);
split = new TableSplit();
}
public SMSplit(TableSplit split) throws IOException{
super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0, null);
this.split = split;
}
public TableSplit getSplit() {
return this.split;
}