类org.apache.hadoop.hbase.mapreduce.TableInputFormat源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.mapreduce.TableInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: presto-connectors   文件: HbaseClient.java
/**
 * Exec the HbaseSplit for a query against an Hbase table.
 * <p>
 * Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
 * all sorts of sweet optimizations. What you have here is an important method.
 *
 * @param session Current session
 * @param split HbaseSplit
 * @param columnHandles List of HbaseColumnHandle
 * @return RecordReader<ImmutableBytesWritable ,   Result> for {@link org.apache.hadoop.mapreduce.RecordReader}
 */
public RecordReader<ImmutableBytesWritable, Result> execSplit(ConnectorSession session, HbaseSplit split, List<HbaseColumnHandle> columnHandles)
        throws IllegalAccessException, NoSuchFieldException, IOException, InterruptedException
{
    TableName tableName = TableName.valueOf(split.getSchema(), split.getTable());
    Scan scan = TabletSplitMetadata.convertStringToScan(split.getSplitMetadata().getScan());
    buildScan(scan, session, columnHandles);

    TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
    tableInputFormat.setScan(scan);

    RecordReader<ImmutableBytesWritable, Result> resultRecordReader = tableInputFormat.createRecordReader(new TableSplit(
            TableName.valueOf(split.getSplitMetadata().getTableName()),
            scan,
            split.getSplitMetadata().getStartRow(),
            split.getSplitMetadata().getEndRow(),
            split.getSplitMetadata().getRegionLocation(),
            split.getSplitMetadata().getLength()
    ), null);
    resultRecordReader.initialize(null, null);
    return resultRecordReader;
}
 
源代码2 项目: hbase   文件: MobRefReporter.java
@Override
public void setup(Context context) throws IOException, InterruptedException {
  final Configuration conf = context.getConfiguration();
  final String tableName = conf.get(TableInputFormat.INPUT_TABLE);
  if (null == tableName) {
    throw new IOException("Job configuration did not include table.");
  }
  table = TableName.valueOf(tableName);
  mobRegion = MobUtils.getMobRegionInfo(table).getEncodedName();
  final String family = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
  if (null == family) {
    throw new IOException("Job configuration did not include column family");
  }
  mob = MobUtils.getMobFamilyPath(conf, table, family);
  LOG.info("Using active mob area '{}'", mob);
  archive = HFileArchiveUtil.getStoreArchivePath(conf, table,
      MobUtils.getMobRegionInfo(table).getEncodedName(), family);
  LOG.info("Using archive mob area '{}'", archive);
  seperator = conf.get(TextOutputFormat.SEPERATOR, "\t");
}
 
源代码3 项目: cloud-bigtable-examples   文件: CellCounter.java
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
  Scan s = new Scan();
  // Set Scan Versions
  s.setMaxVersions(Integer.MAX_VALUE);
  s.setCacheBlocks(false);
  // Set Scan Column Family
  if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
    s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
  }
  // Set RowFilter or Prefix Filter if applicable.
  Filter rowFilter = getRowFilter(args);
  if (rowFilter!= null) {
    LOG.info("Setting Row Filter for counter.");
    s.setFilter(rowFilter);
  }
  // Set TimeRange if defined
  long timeRange[] = getTimeRange(args);
  if (timeRange != null) {
    LOG.info("Setting TimeRange for counter.");
    s.setTimeRange(timeRange[0], timeRange[1]);
  }
  LOG.warn("Got the Scan: " + s);
  return s;
}
 
源代码4 项目: cloud-bigtable-examples   文件: CellCounter.java
@Override
public int run(String[] args) throws Exception {
  String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
  if (otherArgs.length < 2) {
    System.err.println("ERROR: Wrong number of parameters: " + args.length);
    System.err.println("Usage: CellCounter ");
    System.err.println("       <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
      "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
    System.err.println("  Note: -D properties will be applied to the conf used. ");
    System.err.println("  Additionally, the following SCAN properties can be specified");
    System.err.println("  to get fine grained control on what is counted..");
    System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
    System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
        "string : used to separate the rowId/column family name and qualifier name.");
    System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
        "operation to a limited subset of rows from the table based on regex or prefix pattern.");
    return -1;
  }
  Job job = createSubmittableJob(getConf(), otherArgs);
  return (job.waitForCompletion(true) ? 0 : 1);
}
 
源代码5 项目: spliceengine   文件: SpliceTableMapReduceUtil.java
/**
 * Use this before submitting a TableMap job. It will appropriately set up
 * the job.
 *
 * @param table  The Splice table name to read from.
 * @param scan  The scan instance with the columns, time range etc.
 * @param mapper  The mapper class to use.
 * @param outputKeyClass  The class of the output key.
 * @param outputValueClass  The class of the output value.
 * @param job  The current job to adjust.  Make sure the passed job is
 * carrying all necessary HBase configuration.
 * @param addDependencyJars upload HBase jars and jars for any of the configured
 *           job classes via the distributed cache (tmpjars).
 * @throws IOException When setting up the details fails.
 */
public static void initTableMapperJob(String table,Scan scan,
                                      Class<? extends Mapper> mapper,
                                      Class<? extends WritableComparable> outputKeyClass,
                                      Class<? extends Object> outputValueClass,Job job,
                                      boolean addDependencyJars,Class<? extends InputFormat> inputFormatClass)
        throws IOException{
    job.setInputFormatClass(inputFormatClass);
    if(outputValueClass!=null) job.setMapOutputValueClass(outputValueClass);
    if(outputKeyClass!=null) job.setMapOutputKeyClass(outputKeyClass);
    if(mapper!=null) job.setMapperClass(mapper);
    job.getConfiguration().set(MRConstants.SPLICE_INPUT_TABLE_NAME,table);
    job.getConfiguration().set(TableInputFormat.SCAN,convertScanToString(scan));
    if(addDependencyJars){
        addDependencyJars(job);
    }

}
 
源代码6 项目: spliceengine   文件: SMInputFormat.java
public SMRecordReaderImpl getRecordReader(InputSplit split, Configuration config) throws IOException,
        InterruptedException {
    config.addResource(conf);
    if (LOG.isDebugEnabled())
        SpliceLogUtils.debug(LOG, "getRecordReader with table=%s, inputTable=%s," +
                "conglomerate=%s",
                table,
                config.get(TableInputFormat.INPUT_TABLE),
                config.get(MRConstants.SPLICE_INPUT_CONGLOMERATE));
    rr = new SMRecordReaderImpl(conf);
    if(table == null){
        TableName tableInfo = TableName.valueOf(config.get(TableInputFormat.INPUT_TABLE));
        PartitionFactory tableFactory=SIDriver.driver().getTableFactory();
        table = ((ClientPartition)tableFactory.getTable(tableInfo)).unwrapDelegate();
    }
    rr.setHTable(table);
    if (LOG.isDebugEnabled())
        SpliceLogUtils.debug(LOG, "returning record reader");
    return rr;
}
 
源代码7 项目: SpyGlass   文件: HBaseRawTap.java
@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
	// a hack for MultiInputFormat to see that there is a child format
	FileInputFormat.setInputPaths(conf, getPath());

	if (quorumNames != null) {
		conf.set("hbase.zookeeper.quorum", quorumNames);
	}

	LOG.debug("sourcing from table: {}", tableName);
	conf.set(TableInputFormat.INPUT_TABLE, tableName);
	if (null != base64Scan)
		conf.set(TableInputFormat.SCAN, base64Scan);

	super.sourceConfInit(process, conf);
}
 
源代码8 项目: super-cloudops   文件: HfileBulkExporter.java
/**
 * Do hfile bulk exporting
 * 
 * @param builder
 * @throws Exception
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doExporting(CommandLine line) throws Exception {
	// Configuration.
	String tabname = line.getOptionValue("tabname");
	String user = line.getOptionValue("user");
	Configuration conf = new Configuration();
	conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
	conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
	conf.set(TableInputFormat.INPUT_TABLE, tabname);
	conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));

	// Check directory.
	String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
	FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
	state(!fs.exists(new Path(outputDir)), format("HDFS temporary directory already has data, path: '%s'", outputDir));

	// Set scan condition.(if necessary)
	setScanIfNecessary(conf, line);

	// Job.
	Connection conn = ConnectionFactory.createConnection(conf);
	TableName tab = TableName.valueOf(tabname);
	Job job = Job.getInstance(conf);
	job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
	job.setJarByClass(HfileBulkExporter.class);
	job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
	job.setInputFormatClass(TableInputFormat.class);
	job.setMapOutputKeyClass(ImmutableBytesWritable.class);
	job.setMapOutputValueClass(Put.class);

	HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
	FileOutputFormat.setOutputPath(job, new Path(outputDir));
	if (job.waitForCompletion(true)) {
		long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
		long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
		log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
	}

}
 
源代码9 项目: presto-connectors   文件: HbaseClient.java
private static TableInputFormat getNewTableInputFormat(Connection connection, TableName tableName)
        throws IOException, NoSuchFieldException, IllegalAccessException
{
    TableInputFormat tableInputFormat = new TableInputFormat();
    HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "table", connection.getTable(tableName));
    HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "regionLocator", connection.getRegionLocator(tableName));
    HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "admin", connection.getAdmin());
    return tableInputFormat;
}
 
源代码10 项目: spork   文件: HBaseStorage.java
@Override
public InputFormat getInputFormat() {
    TableInputFormat inputFormat = new HBaseTableIFBuilder()
    .withLimit(limit_)
    .withGt(gt_)
    .withGte(gte_)
    .withLt(lt_)
    .withLte(lte_)
    .withConf(m_conf)
    .build();
    inputFormat.setScan(scan);
    return inputFormat;
}
 
源代码11 项目: spork   文件: HBaseStorage.java
@Override
public void setLocation(String location, Job job) throws IOException {
    Properties udfProps = getUDFProperties();
    job.getConfiguration().setBoolean("pig.noSplitCombination", true);

    m_conf = initializeLocalJobConfig(job);
    String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
    if (delegationTokenSet == null) {
        addHBaseDelegationToken(m_conf, job);
        udfProps.setProperty(HBASE_TOKEN_SET, "true");
    }

    String tablename = location;
    if (location.startsWith("hbase://")) {
        tablename = location.substring(8);
    }

    m_conf.set(TableInputFormat.INPUT_TABLE, tablename);

    String projectedFields = udfProps.getProperty( projectedFieldsName() );
    if (projectedFields != null) {
        // update columnInfo_
        pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
    }
    addFiltersWithoutColumnPrefix(columnInfo_);

    if (requiredFieldList != null) {
        Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                new String[] {contextSignature});
        p.setProperty(contextSignature + "_projectedFields", ObjectSerializer.serialize(requiredFieldList));
    }
}
 
源代码12 项目: spliceengine   文件: AbstractSMInputFormat.java
/**
 * Allows subclasses to set the {@link HTable}.
 *
 * @param table  The table to get the data from.
 */
protected void setHTable(Table table) {
    if (table == null) throw new IllegalArgumentException("Unexpected null value for 'table'.");
    this.table = table;
    if (conf == null) throw new RuntimeException("Unexpected null value for 'conf'");
    conf.set(TableInputFormat.INPUT_TABLE, table.getName().getNameAsString());
}
 
源代码13 项目: kite   文件: HBaseViewKeyInputFormat.java
private TableInputFormat getDelegate(Configuration conf) throws IOException {
  TableInputFormat delegate = new TableInputFormat();
  String tableName = HBaseMetadataProvider.getTableName(dataset.getName());
  conf.set(TableInputFormat.INPUT_TABLE, tableName);
  if (view != null) {
    Job tempJob = new Job();
    Scan scan = ((BaseEntityScanner) view.newEntityScanner()).getScan();
    TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapper.class, null,
        null, tempJob);
    Configuration tempConf = Hadoop.JobContext.getConfiguration.invoke(tempJob);
    conf.set(SCAN, tempConf.get(SCAN));
  }
  delegate.setConf(conf);
  return delegate;
}
 
源代码14 项目: super-cloudops   文件: SimpleHfileToRmdbExporter.java
/**
 * Do hfile bulk exporting
 * 
 * @param builder
 * @throws Exception
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doRmdbExporting(CommandLine line) throws Exception {
	// Configuration.
	String tabname = line.getOptionValue("tabname");
	String user = line.getOptionValue("user");
	Configuration conf = new Configuration();
	conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
	conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
	conf.set(TableInputFormat.INPUT_TABLE, tabname);
	conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));

	// Check directory.
	String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
	FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
	if (fs.exists(new Path(outputDir))) {
		fs.delete(new Path(outputDir), true);
	}

	// Set scan condition.(if necessary)
	HfileBulkExporter.setScanIfNecessary(conf, line);

	// Job.
	Connection conn = ConnectionFactory.createConnection(conf);
	TableName tab = TableName.valueOf(tabname);
	Job job = Job.getInstance(conf);
	job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
	job.setJarByClass(HfileBulkExporter.class);
	job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
	job.setInputFormatClass(TableInputFormat.class);
	job.setMapOutputKeyClass(ImmutableBytesWritable.class);
	job.setMapOutputValueClass(Put.class);

	HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
	FileOutputFormat.setOutputPath(job, new Path(outputDir));
	if (job.waitForCompletion(true)) {
		long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
		long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
		log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
	}

}
 
源代码15 项目: super-cloudops   文件: HfileBulkExporter.java
/**
 * Setup scan condition if necessary.
 * 
 * @param conf
 * @param line
 * @throws IOException
 */
public static void setScanIfNecessary(Configuration conf, CommandLine line) throws IOException {
	String startRow = line.getOptionValue("startRow");
	String endRow = line.getOptionValue("endRow");
	String startTime = line.getOptionValue("startTime");
	String endTime = line.getOptionValue("endTime");

	boolean enabledScan = false;
	Scan scan = new Scan();
	// Row
	if (isNotBlank(startRow)) {
		conf.set(TableInputFormat.SCAN_ROW_START, startRow);
		scan.setStartRow(Bytes.toBytes(startRow));
		enabledScan = true;
	}
	if (isNotBlank(endRow)) {
		Assert2.hasText(startRow, "Argument for startRow and endRow are used simultaneously");
		conf.set(TableInputFormat.SCAN_ROW_STOP, endRow);
		scan.setStopRow(Bytes.toBytes(endRow));
		enabledScan = true;
	}

	// Row TimeStamp
	if (isNotBlank(startTime) && isNotBlank(endTime)) {
		conf.set(TableInputFormat.SCAN_TIMERANGE_START, startTime);
		conf.set(TableInputFormat.SCAN_TIMERANGE_END, endTime);
		try {
			Timestamp stime = new Timestamp(Long.parseLong(startTime));
			Timestamp etime = new Timestamp(Long.parseLong(endTime));
			scan.setTimeRange(stime.getTime(), etime.getTime());
			enabledScan = true;
		} catch (Exception e) {
			throw new IllegalArgumentException(String.format("Illegal startTime(%s) and endTime(%s)", startTime, endTime), e);
		}
	}

	if (enabledScan) {
		ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
		log.info("All other SCAN configuration are ignored if\n"
				+ "		 * this is specified.See TableMapReduceUtil.convertScanToString(Scan)\n"
				+ "		 * for more details.");
		conf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
	}
}
 
源代码16 项目: presto-connectors   文件: HbaseClient.java
/**
 * Fetches the TabletSplitMetadata for a query against an Hbase table.
 * <p>
 * Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
 * all sorts of sweet optimizations. What you have here is an important method.
 *
 * @param session Current session
 * @param schema Schema name
 * @param table Table Name
 * @param rowIdDomain Domain for the row ID
 * @param constraints Column constraints for the query
 * @return List of TabletSplitMetadata objects for Presto
 */
public List<TabletSplitMetadata> getTabletSplits(
        ConnectorSession session,
        String schema,
        String table,
        Optional<Domain> rowIdDomain,
        List<HbaseColumnConstraint> constraints) //HbaseRowSerializer serializer
{
    try {
        TableName tableName = TableName.valueOf(schema, table);
        LOG.debug("Getting tablet splits for table %s", tableName);

        // Get the initial Range based on the row ID domain
        Collection<Range> rowIdRanges = getRangesFromDomain(rowIdDomain);  //serializer

        // Split the ranges on tablet boundaries, if enabled
        // Create TabletSplitMetadata objects for each range
        boolean fetchTabletLocations = HbaseSessionProperties.isOptimizeLocalityEnabled(session);

        LOG.debug("Fetching tablet locations: %s", fetchTabletLocations);

        ImmutableList.Builder<TabletSplitMetadata> builder = ImmutableList.builder();
        if (rowIdRanges.size() == 0) {  //无 rowkey过滤
            LOG.warn("This request has no rowkey filter");
        }
        List<Scan> rowIdScans = rowIdRanges.size() == 0 ?
                Arrays.asList(new Scan())
                : rowIdRanges.stream().map(HbaseClient::getScanFromPrestoRange).collect(Collectors.toList());

        for (Scan scan : rowIdScans) {
            TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
            tableInputFormat.setConf(connection.getConfiguration());
            tableInputFormat.setScan(scan);

            JobContext context = new JobContextImpl(new JobConf(), null);
            List<TableSplit> splits = tableInputFormat.getSplits(context)
                    .stream().map(x -> (TableSplit) x).collect(Collectors.toList());

            for (TableSplit split : splits) {
                TabletSplitMetadata metadata = new TabletSplitMetadata(
                        split.getTable().getName(),
                        split.getStartRow(),
                        split.getEndRow(),
                        TabletSplitMetadata.convertScanToString(split.getScan()),
                        split.getRegionLocation(),
                        split.getLength());
                builder.add(metadata);
            }
        }
        List<TabletSplitMetadata> tabletSplits = builder.build();

        // Log some fun stuff and return the tablet splits
        LOG.debug("Number of splits for table %s is %d with %d ranges", tableName, tabletSplits.size(), rowIdRanges.size());
        return tabletSplits;
    }
    catch (Exception e) {
        throw new PrestoException(UNEXPECTED_HBASE_ERROR, "Failed to get splits from Hbase", e);
    }
}
 
源代码17 项目: yuzhouwan   文件: TwoLevelIndexBuilder.java
public static void main(String[] args) throws Exception {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        TwoLevelIndexBuilder conn = new TwoLevelIndexBuilder(rootDir, zkServer, port);

        Configuration conf = conn.conf;
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        //TwoLevelIndexBuilder: TableName, ColumnFamily, Qualifier
        if (otherArgs.length < 3) {
            System.exit(-1);
        }
        //表名
        String tableName = otherArgs[0];
        //列族
        String columnFamily = otherArgs[1];

        conf.set("tableName", tableName);
        conf.set("columnFamily", columnFamily);

        //列 (可能存在多个列)
        String[] qualifiers = new String[otherArgs.length - 2];
        System.arraycopy(otherArgs, 2, qualifiers, 0, qualifiers.length);

        //设置列
        conf.setStrings("qualifiers", qualifiers);

        Job job = new Job(conf, tableName);

        job.setJarByClass(TwoLevelIndexBuilder.class);
        job.setMapperClass(TowLevelIndexMapper.class);
        job.setNumReduceTasks(0);       //由于不需要执行 reduce阶段
        job.setInputFormatClass(TableInputFormat.class);
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(),
                TowLevelIndexMapper.class, ImmutableBytesWritable.class, Put.class, job);

        job.waitForCompletion(true);
    }
 
源代码18 项目: hbase   文件: MobRefReporter.java
/**
 * Main method for the tool.
 * @return 0 if success, 1 for bad args. 2 if job aborted with an exception,
 *   3 if mr job was unsuccessful
 */
public int run(String[] args) throws IOException, InterruptedException {
  // TODO make family and table optional
  if (args.length != 3) {
    printUsage();
    return 1;
  }
  final String output = args[0];
  final String tableName = args[1];
  final String familyName = args[2];
  final long reportStartTime = EnvironmentEdgeManager.currentTime();
  Configuration conf = getConf();
  try {
    FileSystem fs = FileSystem.get(conf);
    // check whether the current user is the same one with the owner of hbase root
    String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
    FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
    if (hbaseRootFileStat.length > 0) {
      String owner = hbaseRootFileStat[0].getOwner();
      if (!owner.equals(currentUserName)) {
        String errorMsg = "The current user[" + currentUserName
            + "] does not have hbase root credentials."
            + " If this job fails due to an inability to read HBase's internal directories, "
            + "you will need to rerun as a user with sufficient permissions. The HBase superuser "
            + "is a safe choice.";
        LOG.warn(errorMsg);
      }
    } else {
      LOG.error("The passed configs point to an HBase dir does not exist: {}",
          conf.get(HConstants.HBASE_DIR));
      throw new IOException("The target HBase does not exist");
    }

    byte[] family;
    int maxVersions;
    TableName tn = TableName.valueOf(tableName);
    try (Connection connection = ConnectionFactory.createConnection(conf);
         Admin admin = connection.getAdmin()) {
      TableDescriptor htd = admin.getDescriptor(tn);
      ColumnFamilyDescriptor hcd = htd.getColumnFamily(Bytes.toBytes(familyName));
      if (hcd == null || !hcd.isMobEnabled()) {
        throw new IOException("Column family " + familyName + " is not a MOB column family");
      }
      family = hcd.getName();
      maxVersions = hcd.getMaxVersions();
    }


    String id = getClass().getSimpleName() + UUID.randomUUID().toString().replace("-", "");
    Job job = null;
    Scan scan = new Scan();
    scan.addFamily(family);
    // Do not retrieve the mob data when scanning
    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
    scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
    // If a scanner caching value isn't set, pick a smaller default since we know we're doing
    // a full table scan and don't want to impact other clients badly.
    scan.setCaching(conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 10000));
    scan.setCacheBlocks(false);
    scan.readVersions(maxVersions);
    conf.set(REPORT_JOB_ID, id);

    job = Job.getInstance(conf);
    job.setJarByClass(getClass());
    TableMapReduceUtil.initTableMapperJob(tn, scan,
        MobRefMapper.class, Text.class, ImmutableBytesWritable.class, job);

    job.setReducerClass(MobRefReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(output));

    job.setJobName(getClass().getSimpleName() + "-" + tn + "-" + familyName);
    // for use in the reducer. easier than re-parsing it out of the scan string.
    job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);

    // Use when we start this job as the base point for file "recency".
    job.getConfiguration().setLong(REPORT_START_DATETIME, reportStartTime);

    if (job.waitForCompletion(true)) {
      LOG.info("Finished creating report for '{}', family='{}'", tn, familyName);
    } else {
      System.err.println("Job was not successful");
      return 3;
    }
    return 0;

  } catch (ClassNotFoundException | RuntimeException | IOException | InterruptedException e) {
    System.err.println("Job aborted due to exception " + e);
    return 2; // job failed
  }
}
 
源代码19 项目: kite   文件: HBaseViewKeyInputFormat.java
public HBaseRecordReaderWrapper(TableInputFormat delegate,
                                EntityMapper<E> entityMapper) {
  super(delegate);
  this.entityMapper = entityMapper;
}
 
源代码20 项目: spliceengine   文件: SpliceTableMapReduceUtil.java
/**
 * Use this before submitting a TableMap job. It will appropriately set up
 * the job.
 *
 * @param table Binary representation of the Splice table name to read from.
 * @param scan  The scan instance with the columns, time range etc.
 * @param mapper  The mapper class to use.
 * @param outputKeyClass  The class of the output key.
 * @param outputValueClass  The class of the output value.
 * @param job  The current job to adjust.  Make sure the passed job is
 * carrying all necessary HBase configuration.
 * @param addDependencyJars upload HBase jars and jars for any of the configured
 *           job classes via the distributed cache (tmpjars).
 * @throws IOException When setting up the details fails.
 */
public static void initTableMapperJob(byte[] table,Scan scan,
                                      Class<? extends Mapper> mapper,
                                      Class<? extends WritableComparable> outputKeyClass,
                                      Class<? extends Object> outputValueClass,Job job,
                                      boolean addDependencyJars)
        throws IOException{
    initTableMapperJob(Bytes.toString(table),scan,mapper,outputKeyClass,
            outputValueClass,job,addDependencyJars,TableInputFormat.class);
}
 
源代码21 项目: spliceengine   文件: SpliceTableMapReduceUtil.java
/**
 * Use this before submitting a TableMap job. It will appropriately set up
 * the job.
 *
 * @param table The Splice table name to read from.
 * @param scan  The scan instance with the columns, time range etc.
 * @param mapper  The mapper class to use.
 * @param outputKeyClass  The class of the output key.
 * @param outputValueClass  The class of the output value.
 * @param job  The current job to adjust.  Make sure the passed job is
 * carrying all necessary HBase configuration.
 * @param addDependencyJars upload HBase jars and jars for any of the configured
 *           job classes via the distributed cache (tmpjars).
 * @throws IOException When setting up the details fails.
 */
public static void initTableMapperJob(String table,Scan scan,
                                      Class<? extends Mapper> mapper,
                                      Class<? extends WritableComparable> outputKeyClass,
                                      Class<? extends Object> outputValueClass,Job job,
                                      boolean addDependencyJars)
        throws IOException{
    initTableMapperJob(table,scan,mapper,outputKeyClass,
            outputValueClass,job,addDependencyJars,TableInputFormat.class);
}
 
 类所在包
 类方法
 同包方法