下面列出了怎么用org.apache.hadoop.hbase.mapreduce.TableInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Exec the HbaseSplit for a query against an Hbase table.
* <p>
* Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
* all sorts of sweet optimizations. What you have here is an important method.
*
* @param session Current session
* @param split HbaseSplit
* @param columnHandles List of HbaseColumnHandle
* @return RecordReader<ImmutableBytesWritable , Result> for {@link org.apache.hadoop.mapreduce.RecordReader}
*/
public RecordReader<ImmutableBytesWritable, Result> execSplit(ConnectorSession session, HbaseSplit split, List<HbaseColumnHandle> columnHandles)
throws IllegalAccessException, NoSuchFieldException, IOException, InterruptedException
{
TableName tableName = TableName.valueOf(split.getSchema(), split.getTable());
Scan scan = TabletSplitMetadata.convertStringToScan(split.getSplitMetadata().getScan());
buildScan(scan, session, columnHandles);
TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
tableInputFormat.setScan(scan);
RecordReader<ImmutableBytesWritable, Result> resultRecordReader = tableInputFormat.createRecordReader(new TableSplit(
TableName.valueOf(split.getSplitMetadata().getTableName()),
scan,
split.getSplitMetadata().getStartRow(),
split.getSplitMetadata().getEndRow(),
split.getSplitMetadata().getRegionLocation(),
split.getSplitMetadata().getLength()
), null);
resultRecordReader.initialize(null, null);
return resultRecordReader;
}
@Override
public void setup(Context context) throws IOException, InterruptedException {
final Configuration conf = context.getConfiguration();
final String tableName = conf.get(TableInputFormat.INPUT_TABLE);
if (null == tableName) {
throw new IOException("Job configuration did not include table.");
}
table = TableName.valueOf(tableName);
mobRegion = MobUtils.getMobRegionInfo(table).getEncodedName();
final String family = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
if (null == family) {
throw new IOException("Job configuration did not include column family");
}
mob = MobUtils.getMobFamilyPath(conf, table, family);
LOG.info("Using active mob area '{}'", mob);
archive = HFileArchiveUtil.getStoreArchivePath(conf, table,
MobUtils.getMobRegionInfo(table).getEncodedName(), family);
LOG.info("Using archive mob area '{}'", archive);
seperator = conf.get(TextOutputFormat.SEPERATOR, "\t");
}
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
Scan s = new Scan();
// Set Scan Versions
s.setMaxVersions(Integer.MAX_VALUE);
s.setCacheBlocks(false);
// Set Scan Column Family
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
}
// Set RowFilter or Prefix Filter if applicable.
Filter rowFilter = getRowFilter(args);
if (rowFilter!= null) {
LOG.info("Setting Row Filter for counter.");
s.setFilter(rowFilter);
}
// Set TimeRange if defined
long timeRange[] = getTimeRange(args);
if (timeRange != null) {
LOG.info("Setting TimeRange for counter.");
s.setTimeRange(timeRange[0], timeRange[1]);
}
LOG.warn("Got the Scan: " + s);
return s;
}
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
System.err.println("Usage: CellCounter ");
System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
"[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to get fine grained control on what is counted..");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
"string : used to separate the rowId/column family name and qualifier name.");
System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
"operation to a limited subset of rows from the table based on regex or prefix pattern.");
return -1;
}
Job job = createSubmittableJob(getConf(), otherArgs);
return (job.waitForCompletion(true) ? 0 : 1);
}
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The Splice table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table,Scan scan,
Class<? extends Mapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Object> outputValueClass,Job job,
boolean addDependencyJars,Class<? extends InputFormat> inputFormatClass)
throws IOException{
job.setInputFormatClass(inputFormatClass);
if(outputValueClass!=null) job.setMapOutputValueClass(outputValueClass);
if(outputKeyClass!=null) job.setMapOutputKeyClass(outputKeyClass);
if(mapper!=null) job.setMapperClass(mapper);
job.getConfiguration().set(MRConstants.SPLICE_INPUT_TABLE_NAME,table);
job.getConfiguration().set(TableInputFormat.SCAN,convertScanToString(scan));
if(addDependencyJars){
addDependencyJars(job);
}
}
public SMRecordReaderImpl getRecordReader(InputSplit split, Configuration config) throws IOException,
InterruptedException {
config.addResource(conf);
if (LOG.isDebugEnabled())
SpliceLogUtils.debug(LOG, "getRecordReader with table=%s, inputTable=%s," +
"conglomerate=%s",
table,
config.get(TableInputFormat.INPUT_TABLE),
config.get(MRConstants.SPLICE_INPUT_CONGLOMERATE));
rr = new SMRecordReaderImpl(conf);
if(table == null){
TableName tableInfo = TableName.valueOf(config.get(TableInputFormat.INPUT_TABLE));
PartitionFactory tableFactory=SIDriver.driver().getTableFactory();
table = ((ClientPartition)tableFactory.getTable(tableInfo)).unwrapDelegate();
}
rr.setHTable(table);
if (LOG.isDebugEnabled())
SpliceLogUtils.debug(LOG, "returning record reader");
return rr;
}
@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths(conf, getPath());
if (quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
LOG.debug("sourcing from table: {}", tableName);
conf.set(TableInputFormat.INPUT_TABLE, tableName);
if (null != base64Scan)
conf.set(TableInputFormat.SCAN, base64Scan);
super.sourceConfInit(process, conf);
}
/**
* Do hfile bulk exporting
*
* @param builder
* @throws Exception
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doExporting(CommandLine line) throws Exception {
// Configuration.
String tabname = line.getOptionValue("tabname");
String user = line.getOptionValue("user");
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
conf.set(TableInputFormat.INPUT_TABLE, tabname);
conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));
// Check directory.
String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
state(!fs.exists(new Path(outputDir)), format("HDFS temporary directory already has data, path: '%s'", outputDir));
// Set scan condition.(if necessary)
setScanIfNecessary(conf, line);
// Job.
Connection conn = ConnectionFactory.createConnection(conf);
TableName tab = TableName.valueOf(tabname);
Job job = Job.getInstance(conf);
job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
job.setJarByClass(HfileBulkExporter.class);
job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
FileOutputFormat.setOutputPath(job, new Path(outputDir));
if (job.waitForCompletion(true)) {
long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
}
}
private static TableInputFormat getNewTableInputFormat(Connection connection, TableName tableName)
throws IOException, NoSuchFieldException, IllegalAccessException
{
TableInputFormat tableInputFormat = new TableInputFormat();
HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "table", connection.getTable(tableName));
HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "regionLocator", connection.getRegionLocator(tableName));
HbaseClient.inject(TableInputFormatBase.class, tableInputFormat, "admin", connection.getAdmin());
return tableInputFormat;
}
@Override
public InputFormat getInputFormat() {
TableInputFormat inputFormat = new HBaseTableIFBuilder()
.withLimit(limit_)
.withGt(gt_)
.withGte(gte_)
.withLt(lt_)
.withLte(lte_)
.withConf(m_conf)
.build();
inputFormat.setScan(scan);
return inputFormat;
}
@Override
public void setLocation(String location, Job job) throws IOException {
Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
m_conf = initializeLocalJobConfig(job);
String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
if (delegationTokenSet == null) {
addHBaseDelegationToken(m_conf, job);
udfProps.setProperty(HBASE_TOKEN_SET, "true");
}
String tablename = location;
if (location.startsWith("hbase://")) {
tablename = location.substring(8);
}
m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
String projectedFields = udfProps.getProperty( projectedFieldsName() );
if (projectedFields != null) {
// update columnInfo_
pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
}
addFiltersWithoutColumnPrefix(columnInfo_);
if (requiredFieldList != null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {contextSignature});
p.setProperty(contextSignature + "_projectedFields", ObjectSerializer.serialize(requiredFieldList));
}
}
/**
* Allows subclasses to set the {@link HTable}.
*
* @param table The table to get the data from.
*/
protected void setHTable(Table table) {
if (table == null) throw new IllegalArgumentException("Unexpected null value for 'table'.");
this.table = table;
if (conf == null) throw new RuntimeException("Unexpected null value for 'conf'");
conf.set(TableInputFormat.INPUT_TABLE, table.getName().getNameAsString());
}
private TableInputFormat getDelegate(Configuration conf) throws IOException {
TableInputFormat delegate = new TableInputFormat();
String tableName = HBaseMetadataProvider.getTableName(dataset.getName());
conf.set(TableInputFormat.INPUT_TABLE, tableName);
if (view != null) {
Job tempJob = new Job();
Scan scan = ((BaseEntityScanner) view.newEntityScanner()).getScan();
TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapper.class, null,
null, tempJob);
Configuration tempConf = Hadoop.JobContext.getConfiguration.invoke(tempJob);
conf.set(SCAN, tempConf.get(SCAN));
}
delegate.setConf(conf);
return delegate;
}
/**
* Do hfile bulk exporting
*
* @param builder
* @throws Exception
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doRmdbExporting(CommandLine line) throws Exception {
// Configuration.
String tabname = line.getOptionValue("tabname");
String user = line.getOptionValue("user");
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
conf.set(TableInputFormat.INPUT_TABLE, tabname);
conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));
// Check directory.
String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
if (fs.exists(new Path(outputDir))) {
fs.delete(new Path(outputDir), true);
}
// Set scan condition.(if necessary)
HfileBulkExporter.setScanIfNecessary(conf, line);
// Job.
Connection conn = ConnectionFactory.createConnection(conf);
TableName tab = TableName.valueOf(tabname);
Job job = Job.getInstance(conf);
job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
job.setJarByClass(HfileBulkExporter.class);
job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
FileOutputFormat.setOutputPath(job, new Path(outputDir));
if (job.waitForCompletion(true)) {
long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
}
}
/**
* Setup scan condition if necessary.
*
* @param conf
* @param line
* @throws IOException
*/
public static void setScanIfNecessary(Configuration conf, CommandLine line) throws IOException {
String startRow = line.getOptionValue("startRow");
String endRow = line.getOptionValue("endRow");
String startTime = line.getOptionValue("startTime");
String endTime = line.getOptionValue("endTime");
boolean enabledScan = false;
Scan scan = new Scan();
// Row
if (isNotBlank(startRow)) {
conf.set(TableInputFormat.SCAN_ROW_START, startRow);
scan.setStartRow(Bytes.toBytes(startRow));
enabledScan = true;
}
if (isNotBlank(endRow)) {
Assert2.hasText(startRow, "Argument for startRow and endRow are used simultaneously");
conf.set(TableInputFormat.SCAN_ROW_STOP, endRow);
scan.setStopRow(Bytes.toBytes(endRow));
enabledScan = true;
}
// Row TimeStamp
if (isNotBlank(startTime) && isNotBlank(endTime)) {
conf.set(TableInputFormat.SCAN_TIMERANGE_START, startTime);
conf.set(TableInputFormat.SCAN_TIMERANGE_END, endTime);
try {
Timestamp stime = new Timestamp(Long.parseLong(startTime));
Timestamp etime = new Timestamp(Long.parseLong(endTime));
scan.setTimeRange(stime.getTime(), etime.getTime());
enabledScan = true;
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Illegal startTime(%s) and endTime(%s)", startTime, endTime), e);
}
}
if (enabledScan) {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
log.info("All other SCAN configuration are ignored if\n"
+ " * this is specified.See TableMapReduceUtil.convertScanToString(Scan)\n"
+ " * for more details.");
conf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
}
}
/**
* Fetches the TabletSplitMetadata for a query against an Hbase table.
* <p>
* Does a whole bunch of fun stuff! Splitting on row ID ranges, applying secondary indexes, column pruning,
* all sorts of sweet optimizations. What you have here is an important method.
*
* @param session Current session
* @param schema Schema name
* @param table Table Name
* @param rowIdDomain Domain for the row ID
* @param constraints Column constraints for the query
* @return List of TabletSplitMetadata objects for Presto
*/
public List<TabletSplitMetadata> getTabletSplits(
ConnectorSession session,
String schema,
String table,
Optional<Domain> rowIdDomain,
List<HbaseColumnConstraint> constraints) //HbaseRowSerializer serializer
{
try {
TableName tableName = TableName.valueOf(schema, table);
LOG.debug("Getting tablet splits for table %s", tableName);
// Get the initial Range based on the row ID domain
Collection<Range> rowIdRanges = getRangesFromDomain(rowIdDomain); //serializer
// Split the ranges on tablet boundaries, if enabled
// Create TabletSplitMetadata objects for each range
boolean fetchTabletLocations = HbaseSessionProperties.isOptimizeLocalityEnabled(session);
LOG.debug("Fetching tablet locations: %s", fetchTabletLocations);
ImmutableList.Builder<TabletSplitMetadata> builder = ImmutableList.builder();
if (rowIdRanges.size() == 0) { //无 rowkey过滤
LOG.warn("This request has no rowkey filter");
}
List<Scan> rowIdScans = rowIdRanges.size() == 0 ?
Arrays.asList(new Scan())
: rowIdRanges.stream().map(HbaseClient::getScanFromPrestoRange).collect(Collectors.toList());
for (Scan scan : rowIdScans) {
TableInputFormat tableInputFormat = getNewTableInputFormat(connection, tableName);
tableInputFormat.setConf(connection.getConfiguration());
tableInputFormat.setScan(scan);
JobContext context = new JobContextImpl(new JobConf(), null);
List<TableSplit> splits = tableInputFormat.getSplits(context)
.stream().map(x -> (TableSplit) x).collect(Collectors.toList());
for (TableSplit split : splits) {
TabletSplitMetadata metadata = new TabletSplitMetadata(
split.getTable().getName(),
split.getStartRow(),
split.getEndRow(),
TabletSplitMetadata.convertScanToString(split.getScan()),
split.getRegionLocation(),
split.getLength());
builder.add(metadata);
}
}
List<TabletSplitMetadata> tabletSplits = builder.build();
// Log some fun stuff and return the tablet splits
LOG.debug("Number of splits for table %s is %d with %d ranges", tableName, tabletSplits.size(), rowIdRanges.size());
return tabletSplits;
}
catch (Exception e) {
throw new PrestoException(UNEXPECTED_HBASE_ERROR, "Failed to get splits from Hbase", e);
}
}
public static void main(String[] args) throws Exception {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
TwoLevelIndexBuilder conn = new TwoLevelIndexBuilder(rootDir, zkServer, port);
Configuration conf = conn.conf;
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//TwoLevelIndexBuilder: TableName, ColumnFamily, Qualifier
if (otherArgs.length < 3) {
System.exit(-1);
}
//表名
String tableName = otherArgs[0];
//列族
String columnFamily = otherArgs[1];
conf.set("tableName", tableName);
conf.set("columnFamily", columnFamily);
//列 (可能存在多个列)
String[] qualifiers = new String[otherArgs.length - 2];
System.arraycopy(otherArgs, 2, qualifiers, 0, qualifiers.length);
//设置列
conf.setStrings("qualifiers", qualifiers);
Job job = new Job(conf, tableName);
job.setJarByClass(TwoLevelIndexBuilder.class);
job.setMapperClass(TowLevelIndexMapper.class);
job.setNumReduceTasks(0); //由于不需要执行 reduce阶段
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(),
TowLevelIndexMapper.class, ImmutableBytesWritable.class, Put.class, job);
job.waitForCompletion(true);
}
/**
* Main method for the tool.
* @return 0 if success, 1 for bad args. 2 if job aborted with an exception,
* 3 if mr job was unsuccessful
*/
public int run(String[] args) throws IOException, InterruptedException {
// TODO make family and table optional
if (args.length != 3) {
printUsage();
return 1;
}
final String output = args[0];
final String tableName = args[1];
final String familyName = args[2];
final long reportStartTime = EnvironmentEdgeManager.currentTime();
Configuration conf = getConf();
try {
FileSystem fs = FileSystem.get(conf);
// check whether the current user is the same one with the owner of hbase root
String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
if (hbaseRootFileStat.length > 0) {
String owner = hbaseRootFileStat[0].getOwner();
if (!owner.equals(currentUserName)) {
String errorMsg = "The current user[" + currentUserName
+ "] does not have hbase root credentials."
+ " If this job fails due to an inability to read HBase's internal directories, "
+ "you will need to rerun as a user with sufficient permissions. The HBase superuser "
+ "is a safe choice.";
LOG.warn(errorMsg);
}
} else {
LOG.error("The passed configs point to an HBase dir does not exist: {}",
conf.get(HConstants.HBASE_DIR));
throw new IOException("The target HBase does not exist");
}
byte[] family;
int maxVersions;
TableName tn = TableName.valueOf(tableName);
try (Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin()) {
TableDescriptor htd = admin.getDescriptor(tn);
ColumnFamilyDescriptor hcd = htd.getColumnFamily(Bytes.toBytes(familyName));
if (hcd == null || !hcd.isMobEnabled()) {
throw new IOException("Column family " + familyName + " is not a MOB column family");
}
family = hcd.getName();
maxVersions = hcd.getMaxVersions();
}
String id = getClass().getSimpleName() + UUID.randomUUID().toString().replace("-", "");
Job job = null;
Scan scan = new Scan();
scan.addFamily(family);
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
// If a scanner caching value isn't set, pick a smaller default since we know we're doing
// a full table scan and don't want to impact other clients badly.
scan.setCaching(conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 10000));
scan.setCacheBlocks(false);
scan.readVersions(maxVersions);
conf.set(REPORT_JOB_ID, id);
job = Job.getInstance(conf);
job.setJarByClass(getClass());
TableMapReduceUtil.initTableMapperJob(tn, scan,
MobRefMapper.class, Text.class, ImmutableBytesWritable.class, job);
job.setReducerClass(MobRefReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(output));
job.setJobName(getClass().getSimpleName() + "-" + tn + "-" + familyName);
// for use in the reducer. easier than re-parsing it out of the scan string.
job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
// Use when we start this job as the base point for file "recency".
job.getConfiguration().setLong(REPORT_START_DATETIME, reportStartTime);
if (job.waitForCompletion(true)) {
LOG.info("Finished creating report for '{}', family='{}'", tn, familyName);
} else {
System.err.println("Job was not successful");
return 3;
}
return 0;
} catch (ClassNotFoundException | RuntimeException | IOException | InterruptedException e) {
System.err.println("Job aborted due to exception " + e);
return 2; // job failed
}
}
public HBaseRecordReaderWrapper(TableInputFormat delegate,
EntityMapper<E> entityMapper) {
super(delegate);
this.entityMapper = entityMapper;
}
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table Binary representation of the Splice table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(byte[] table,Scan scan,
Class<? extends Mapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Object> outputValueClass,Job job,
boolean addDependencyJars)
throws IOException{
initTableMapperJob(Bytes.toString(table),scan,mapper,outputKeyClass,
outputValueClass,job,addDependencyJars,TableInputFormat.class);
}
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The Splice table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table,Scan scan,
Class<? extends Mapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Object> outputValueClass,Job job,
boolean addDependencyJars)
throws IOException{
initTableMapperJob(table,scan,mapper,outputKeyClass,
outputValueClass,job,addDependencyJars,TableInputFormat.class);
}