类org.apache.hadoop.hbase.mapreduce.TableSplit源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.mapreduce.TableSplit的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: presto-connectors   文件: HbaseClient.java
/**
 * Exec the HbaseSplit for a query against an Hbase table.
 * <p>
 * Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
 * all sorts of sweet optimizations. What you have here is an important method.
 *
 * @param session Current session
 * @param split HbaseSplit
 * @param columnHandles List of HbaseColumnHandle
 * @return RecordReader<ImmutableBytesWritable ,   Result> for {@link org.apache.hadoop.mapreduce.RecordReader}
 */
public RecordReader<ImmutableBytesWritable, Result> execSplit(ConnectorSession session, HbaseSplit split, List<HbaseColumnHandle> columnHandles)
        throws IllegalAccessException, NoSuchFieldException, IOException, InterruptedException
{
    TableName tableName = TableName.valueOf(split.getSchema(), split.getTable());
    Scan scan = TabletSplitMetadata.convertStringToScan(split.getSplitMetadata().getScan());
    buildScan(scan, session, columnHandles);

    TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
    tableInputFormat.setScan(scan);

    RecordReader<ImmutableBytesWritable, Result> resultRecordReader = tableInputFormat.createRecordReader(new TableSplit(
            TableName.valueOf(split.getSplitMetadata().getTableName()),
            scan,
            split.getSplitMetadata().getStartRow(),
            split.getSplitMetadata().getEndRow(),
            split.getSplitMetadata().getRegionLocation(),
            split.getSplitMetadata().getLength()
    ), null);
    resultRecordReader.initialize(null, null);
    return resultRecordReader;
}
 
源代码2 项目: spork   文件: HBaseTableInputFormat.java
@Override
public List<InputSplit> getSplits(org.apache.hadoop.mapreduce.JobContext context)
throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    ListIterator<InputSplit> splitIter = splits.listIterator();
    while (splitIter.hasNext()) {
        TableSplit split = (TableSplit) splitIter.next();
        byte[] startKey = split.getStartRow();
        byte[] endKey = split.getEndRow();
        // Skip if the region doesn't satisfy configured options.
        if ((skipRegion(CompareOp.LESS, startKey, lt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gt_)) ||
                (skipRegion(CompareOp.GREATER, endKey, gte_)) ||
                (skipRegion(CompareOp.LESS_OR_EQUAL, startKey, lte_)) )  {
            splitIter.remove();
        }
    }
    return splits;
}
 
源代码3 项目: spliceengine   文件: SMRecordReaderImplIT.java
@Test
public void completeMemstoreScan() throws Exception {
  List<String> names = new ArrayList<String>();
  names.add("COL1");
  names.add("COL2");
  config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".A", names).base64Encode());
  SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
  TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".A"));
  try (Connection connection = ConnectionFactory.createConnection(config)) {
    Table table = connection.getTable(tableName);
    Scan scan = new Scan();
    rr.setHTable(table);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
      i++;
      Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
      Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
      Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
    }
    Assert.assertEquals("incorrect results returned", 1000, i);
  }
}
 
源代码4 项目: spliceengine   文件: SMRecordReaderImplIT.java
@Test
public void emptyMemstoreScan() throws Exception {
  List<String> names = new ArrayList<String>();
  names.add("COL1");
  names.add("COL2");
  config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".D", names).base64Encode());
  SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
  TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".D"));
  try (Connection connection = ConnectionFactory.createConnection(config)) {
    Table table = connection.getTable(tableName);
    Scan scan = new Scan();
    rr.setHTable(table);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
      i++;
      Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
      Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
      Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
    }
    Assert.assertEquals("incorrect results returned", 1000, i);
  }
}
 
源代码5 项目: spliceengine   文件: SMRecordReaderImplIT.java
@Test
public void singleRegionScanWithOneStoreFileAndMemstore() throws Exception {
  List<String> names = new ArrayList<String>();
  names.add("COL1");
  names.add("COL2");
  config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".B", names).base64Encode());
  SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
  TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".B"));
  try (Connection connection = ConnectionFactory.createConnection(config)) {
    Table table = connection.getTable(tableName);
    Scan scan = new Scan();
    rr.setHTable(table);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
      i++;
      Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
      Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
      Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
    }
    Assert.assertEquals("incorrect results returned", 1000, i);
  }
}
 
源代码6 项目: spliceengine   文件: SMRecordReaderImplIT.java
@Test
public void twoRegionsWithMemstores() throws Exception {
  List<String> names = new ArrayList<String>();
  names.add("COL1");
  names.add("COL2");
  config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".C", names).base64Encode());
  SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
  TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".C"));
  try (Connection connection = ConnectionFactory.createConnection(config)) {
    Table table = connection.getTable(tableName);
    Scan scan = new Scan();
    rr.setHTable(table);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
      i++;
      Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
      Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
      Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
    }
    Assert.assertEquals("incorrect results returned", 10000, i);
  }
}
 
源代码7 项目: spliceengine   文件: SMRecordReaderImplIT.java
@Test
public void testScanAfterMajorCompaction() throws Exception {
  List<String> names = new ArrayList<String>();
  names.add("COL1");
  names.add("COL2");
  config.set(MRConstants.SPLICE_SCAN_INFO, sqlUtil.getTableScannerBuilder(SMRecordReaderImplIT.class.getSimpleName() + ".E", names).base64Encode());
  SMRecordReaderImpl rr = new SMRecordReaderImpl(config);
  TableName tableName = TableName.valueOf(sqlUtil.getConglomID(SMRecordReaderImplIT.class.getSimpleName() + ".E"));
  try (Connection connection = ConnectionFactory.createConnection(config)) {
    Table table = connection.getTable(tableName);
    Scan scan = new Scan();
    rr.setHTable(table);
    rr.setScan(scan);
    SMSplit tableSplit = new SMSplit(new TableSplit(tableName, scan.getStartRow(), scan.getStopRow(), "sdfsdf"));
    rr.initialize(tableSplit, null);
    int i = 0;
    while (rr.nextKeyValue()) {
      i++;
      Assert.assertNotNull("Column 1 is null", rr.getCurrentValue().getColumn(1));
      Assert.assertNotNull("Column 2 is null", rr.getCurrentValue().getColumn(2));
      Assert.assertNotNull("Current Key is null", rr.getCurrentKey());
    }
    Assert.assertEquals("incorrect results returned", 5000, i);
  }
}
 
源代码8 项目: hgraphdb   文件: TableInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
    if (isMock()) {
        if (table == null) {
            initialize(context);
        }
        List<InputSplit> splits = new ArrayList<>(1);
        TableSplit split = new TableSplit(getTable().getName(), getScan(),
                HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "", 0);
        splits.add(split);
        return splits;
    } else {
        return super.getSplits(context);
    }
}
 
源代码9 项目: spork   文件: HBaseStorage.java
@Override
public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException {
    if (split instanceof TableSplit) {
        return new TableSplitComparable((TableSplit) split);
    } else {
        throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName());
    }
}
 
源代码10 项目: spliceengine   文件: HBasePartitioner.java
@Override
public void initialize() {
    List<Partition> partitions = Arrays.asList(((SparkDataSet) dataSet).rdd.rdd().partitions());
    tableSplits = new ArrayList<>(partitions.size());
    for (Partition p : partitions) {
        NewHadoopPartition nhp = (NewHadoopPartition) p;
        SMSplit sms = (SMSplit) nhp.serializableHadoopSplit().value();
        TableSplit ts = sms.getSplit();
        if (ts.getStartRow() != null && Bytes.equals(ts.getStartRow(),ts.getEndRow()) && ts.getStartRow().length > 0) {
            // this would be an empty partition, with the same start and end key, so don't add it
            continue;
        }
        tableSplits.add(ts);
    }
}
 
源代码11 项目: spliceengine   文件: AbstractSMInputFormat.java
private List<InputSplit> toSMSplits (List<Partition> splits) throws IOException {
    List<InputSplit> sMSplits = Lists.newArrayList();
    HBaseTableInfoFactory infoFactory = HBaseTableInfoFactory.getInstance(HConfiguration.getConfiguration());
    for(Partition split:splits) {
        SMSplit smSplit = new SMSplit(
                new TableSplit(
                        infoFactory.getTableInfo(split.getTableName()),
                        split.getStartKey(),
                        split.getEndKey(),
                        split.owningServer().getHostname()));
        sMSplits.add(smSplit);
    }
    return sMSplits;
}
 
源代码12 项目: presto-connectors   文件: HbaseClient.java
/**
 * Fetches the TabletSplitMetadata for a query against an Hbase table.
 * <p>
 * Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
 * all sorts of sweet optimizations. What you have here is an important method.
 *
 * @param session Current session
 * @param schema Schema name
 * @param table Table Name
 * @param rowIdDomain Domain for the row ID
 * @param constraints Column constraints for the query
 * @return List of TabletSplitMetadata objects for Presto
 */
public List<TabletSplitMetadata> getTabletSplits(
        ConnectorSession session,
        String schema,
        String table,
        Optional<Domain> rowIdDomain,
        List<HbaseColumnConstraint> constraints) //HbaseRowSerializer serializer
{
    try {
        TableName tableName = TableName.valueOf(schema, table);
        LOG.debug("Getting tablet splits for table %s", tableName);

        // Get the initial Range based on the row ID domain
        Collection<Range> rowIdRanges = getRangesFromDomain(rowIdDomain);  //serializer

        // Split the ranges on tablet boundaries, if enabled
        // Create TabletSplitMetadata objects for each range
        boolean fetchTabletLocations = HbaseSessionProperties.isOptimizeLocalityEnabled(session);

        LOG.debug("Fetching tablet locations: %s", fetchTabletLocations);

        ImmutableList.Builder<TabletSplitMetadata> builder = ImmutableList.builder();
        if (rowIdRanges.size() == 0) {  //无 rowkey过滤
            LOG.warn("This request has no rowkey filter");
        }
        List<Scan> rowIdScans = rowIdRanges.size() == 0 ?
                Arrays.asList(new Scan())
                : rowIdRanges.stream().map(HbaseClient::getScanFromPrestoRange).collect(Collectors.toList());

        for (Scan scan : rowIdScans) {
            TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
            tableInputFormat.setConf(connection.getConfiguration());
            tableInputFormat.setScan(scan);

            JobContext context = new JobContextImpl(new JobConf(), null);
            List<TableSplit> splits = tableInputFormat.getSplits(context)
                    .stream().map(x -> (TableSplit) x).collect(Collectors.toList());

            for (TableSplit split : splits) {
                TabletSplitMetadata metadata = new TabletSplitMetadata(
                        split.getTable().getName(),
                        split.getStartRow(),
                        split.getEndRow(),
                        TabletSplitMetadata.convertScanToString(split.getScan()),
                        split.getRegionLocation(),
                        split.getLength());
                builder.add(metadata);
            }
        }
        List<TabletSplitMetadata> tabletSplits = builder.build();

        // Log some fun stuff and return the tablet splits
        LOG.debug("Number of splits for table %s is %d with %d ranges", tableName, tabletSplits.size(), rowIdRanges.size());
        return tabletSplits;
    }
    catch (Exception e) {
        throw new PrestoException(UNEXPECTED_HBASE_ERROR, "Failed to get splits from Hbase", e);
    }
}
 
源代码13 项目: spork   文件: TableSplitComparable.java
public TableSplitComparable() {
    tsplit = new TableSplit();
}
 
源代码14 项目: spork   文件: TableSplitComparable.java
public TableSplitComparable(TableSplit tsplit) {
    this.tsplit = tsplit;
}
 
源代码15 项目: spork   文件: TableSplitComparable.java
@Override
public int compareTo(TableSplit split) {
    return tsplit.compareTo((TableSplit) split);
}
 
源代码16 项目: spliceengine   文件: SMHiveSplit.java
public TableSplit getSplit() {
  return this.split.getSplit();
}
 
源代码17 项目: spliceengine   文件: SMRecordReaderImpl.java
public void init(Configuration config, InputSplit split) throws IOException, InterruptedException {	
	if (LOG.isDebugEnabled())
		SpliceLogUtils.debug(LOG, "init");
	if (TaskContext.get() != null) {
		TaskContext.get().addTaskFailureListener(this);
	}
	String tableScannerAsString = config.get(MRConstants.SPLICE_SCAN_INFO);
       if (tableScannerAsString == null)
		throw new IOException("splice scan info was not serialized to task, failing");
	byte[] scanStartKey = null;
	byte[] scanStopKey = null;
	try {
		builder = TableScannerBuilder.getTableScannerBuilderFromBase64String(tableScannerAsString);
		if (LOG.isTraceEnabled())
			SpliceLogUtils.trace(LOG, "config loaded builder=%s", builder);
		TableSplit tSplit = ((SMSplit) split).getSplit();
		token = builder.getToken();
		DataScan scan = builder.getScan();
		scanStartKey = scan.getStartKey();
		scanStopKey = scan.getStopKey();
		if (Bytes.startComparator.compare(scanStartKey, tSplit.getStartRow()) < 0) {
			// the split itself is more restrictive
			scan.startKey(tSplit.getStartRow());
		}
		if (Bytes.endComparator.compare(scanStopKey, tSplit.getEndRow()) > 0) {
			// the split itself is more restrictive
			scan.stopKey(tSplit.getEndRow());
		}
		setScan(((HScan) scan).unwrapDelegate());
		// TODO (wjk): this seems weird (added with DB-4483)
		this.statisticsRun = AbstractSMInputFormat.oneSplitPerRegion(config);
		Double sampling = AbstractSMInputFormat.sampling(config);
		if (sampling != null) {
			this.sampling = true;
			this.samplingRate = sampling;
		}
		restart(scan.getStartKey());
	} catch (IOException ioe) {
		LOG.error(String.format("Received exception with scan %s, original start key %s, original stop key %s, split %s",
				scan, Bytes.toStringBinary(scanStartKey), Bytes.toStringBinary(scanStopKey), split), ioe);
		throw ioe;
       } catch (StandardException e) {
		throw new IOException(e);
	}
}
 
源代码18 项目: spliceengine   文件: SMSplit.java
public SMSplit() throws IOException{
  super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0,null);
  split = new TableSplit();
}
 
源代码19 项目: spliceengine   文件: SMSplit.java
public SMSplit(TableSplit split) throws IOException{
  super(FSUtils.getRootDir(HConfiguration.unwrapDelegate()), 0, 0, null);
  this.split = split;
}
 
源代码20 项目: spliceengine   文件: SMSplit.java
public TableSplit getSplit() {
  return this.split;
}
 
 类所在包
 类方法
 同包方法