下面列出了怎么用org.apache.hadoop.hbase.mapreduce.TableOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if (location.startsWith("hbase://")){
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
}else{
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
}
String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema");
if (serializedSchema!= null) {
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
m_conf = initializeLocalJobConfig(job);
// Not setting a udf property and getting the hbase delegation token
// only once like in setLocation as setStoreLocation gets different Job
// objects for each call and the last Job passed is the one that is
// launched. So we end up getting multiple hbase delegation tokens.
addHBaseDelegationToken(m_conf, job);
}
public void run() throws Exception{
long startTime = System.currentTimeMillis();
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, Constants.hbase_user_item_pref_table);
Job job = Job.getInstance(conf, "hbasewriter"+System.currentTimeMillis());
job.setJarByClass(UpdateCFJob.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(HBaseWriteReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
long endTime = System.currentTimeMillis();
boolean isFinish = job.waitForCompletion(true);
if(isFinish){
logger.info("UpdateCFJob job ["+job.getJobName()+"] run finish.it costs"+ (endTime - startTime) / 1000 +"s.");
} else {
logger.error("UpdateCFJob job ["+job.getJobName()+"] run failed.");
}
}
private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
job.setReducerClass(PhoenixIndexImportDirectReducer.class);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
// Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
conf.set(TableOutputFormat.OUTPUT_TABLE,
PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
//Set the Output classes
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.setNumReduceTasks(1);
return job;
}
protected void setConf(Configuration otherConf) {
this.conf = HBaseConfiguration.create(otherConf);
String tableName = this.conf.get(TableOutputFormat.OUTPUT_TABLE);
if (tableName == null || tableName.length() <= 0) {
throw new IllegalArgumentException("Must specify table name");
}
try {
this.conn = ConnectionFactory.createConnection(this.conf);
this.table = conn.getTable(TableName.valueOf(tableName));
LOGGER.info("Created table instance for " + tableName);
} catch (IOException e) {
LOGGER.error("IOException : ", e);
tryClosingResourceSilently(this.conn);
throw new RuntimeException(e);
}
}
/**
* Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
* waits for the job completion based on runForeground parameter.
*
* @param job job
* @param outputPath output path
* @param runForeground - if true, waits for job completion, else submits and returns
* immediately.
* @throws Exception
*/
private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars, boolean runForeground)
throws Exception {
job.setMapperClass(getDirectMapperClass());
job.setReducerClass(getDirectReducerClass());
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString());
//Set the Output classes
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
if (!skipDependencyJars) {
TableMapReduceUtil.addDependencyJars(job);
}
job.setNumReduceTasks(1);
if (!runForeground) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
return;
}
LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
boolean result = job.waitForCompletion(true);
if (!result) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
FileSystem.get(conf).delete(outputPath, true);
}
@Override
public OutputFormat getOutputFormat() throws IOException {
if (outputFormat == null) {
if (m_conf == null) {
throw new IllegalStateException("setStoreLocation has not been called");
} else {
this.outputFormat = new TableOutputFormat();
this.outputFormat.setConf(m_conf);
}
}
return outputFormat;
}
@Override
public Boolean call() throws Exception {
StringBuilder commandLineArgBuilder = new StringBuilder();
commandLineArgBuilder.append(" -dt " + indexInfo.getDataTableName());
commandLineArgBuilder.append(" -it " + indexInfo.getTableName());
commandLineArgBuilder.append(" -direct");
commandLineArgBuilder.append(" -op " + (basePath.endsWith("/") ? basePath : basePath + "/")
+ indexInfo.getTableName());
if (indexInfo.getTableSchem() != null && indexInfo.getTableSchem().trim().length() > 0) {
commandLineArgBuilder.append(" -s " + indexInfo.getTableSchem());
}
// Setting the configuration here again (in addition to IndexTool.java) to doubly sure
// configurations are set
final String qDataTable =
SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getDataTableName());
final String qIndexTable =
SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getTableName());
String physicalIndexTable = qIndexTable;
if (IndexType.LOCAL.equals(indexInfo.getIndexType())) {
physicalIndexTable = MetaDataUtil.getLocalIndexTableName(qDataTable);
}
conf.set(TableOutputFormat.OUTPUT_TABLE, physicalIndexTable);
IndexTool tool = new IndexTool();
tool.setConf(conf);
int result = tool.run(commandLineArgBuilder.toString().split(" "));
return result == 0 ? true : false;
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
config = context.getConfiguration();
table = new HTable(config, Bytes.toBytes(config
.get(TableOutputFormat.OUTPUT_TABLE)));
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word, 1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
// TODO is "mapred.output.dir" really useful?
job.getConfiguration().set("mapred.output.dir", HBaseFlinkTestConstants.TMP_DIR);
counts.map(new RichMapFunction <Tuple2<String, Integer>, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> reuse;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
reuse = new Tuple2<Text, Mutation>();
}
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job));
// execute program
env.execute("WordCount (HBase sink) Example");
}
/**
* @param conf to use to create and run the job. Should be an HBase
* configuration.
* @param input path to the processFile * @param totalJobCount the total
* number of jobs that need to be run in this batch. Used in job
* name.
* @return whether all job confs were loaded properly.
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
private boolean runRawLoaderJob(Configuration myHBaseConf, String input,
int totalJobCount)
throws IOException, InterruptedException, ClassNotFoundException {
boolean success;
// Turn off speculative execution.
// Note: must be BEFORE the job construction with the new mapreduce API.
myHBaseConf.setBoolean("mapred.map.tasks.speculative.execution", false);
// Set up job
Job job = new Job(myHBaseConf, getJobName(totalJobCount));
job.setJarByClass(JobFileRawLoader.class);
Path inputPath = new Path(input);
if (hdfs.exists(inputPath)) {
// Set input
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(job, inputPath);
job.setMapperClass(JobFileRawLoaderMapper.class);
// Set the output format to push data into HBase.
job.setOutputFormatClass(TableOutputFormat.class);
TableMapReduceUtil.initTableReducerJob(Constants.HISTORY_RAW_TABLE, null,
job);
job.setOutputKeyClass(JobFileRawLoaderMapper.getOutputKeyClass());
job.setOutputValueClass(JobFileRawLoaderMapper.getOutputValueClass());
// This is a map-only class, skip reduce step
job.setNumReduceTasks(0);
// Run the job
success = job.waitForCompletion(true);
if (success) {
success = hdfs.delete(inputPath, false);
}
} else {
System.err.println("Unable to find processFile: " + inputPath);
success = false;
}
return success;
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output Splice table name, The format should be Schema.tableName.
* @param reducer The reducer class to use.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in <code>hbase-site.xml</code>.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by <code>hbase-site.xml</code> and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.client
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
* @throws SQLException
*/
public static void initTableReducerJob(String table,
Class<? extends Reducer> reducer,Job job,
Class partitioner,
String quorumAddress,
String serverClass,
String serverImpl,boolean addDependencyJars,Class<? extends OutputFormat> outputformatClass) throws IOException{
Configuration conf=job.getConfiguration();
job.setOutputFormatClass(outputformatClass);
if(reducer!=null) job.setReducerClass(reducer);
conf.set(MRConstants.SPLICE_OUTPUT_TABLE_NAME,table);
if(sqlUtil==null)
sqlUtil=SMSQLUtil.getInstance(conf.get(MRConstants.SPLICE_JDBC_STR));
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
String hbaseTableID=null;
try{
hbaseTableID=sqlUtil.getConglomID(table);
}catch(SQLException e){
// TODO Auto-generated catch block
e.printStackTrace();
throw new IOException(e);
}
conf.set(MRConstants.HBASE_OUTPUT_TABLE_NAME,table);
if(quorumAddress!=null){
// Calling this will validate the format
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if(serverClass!=null && serverImpl!=null){
conf.set(TableOutputFormat.REGION_SERVER_CLASS,serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL,serverImpl);
}
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Object.class);
if(partitioner==HRegionPartitioner.class){
job.setPartitionerClass(HRegionPartitioner.class);
// TODO Where are the keys?
int regions=getReduceNumberOfRegions(hbaseTableID);
if(job.getNumReduceTasks()>regions){
job.setNumReduceTasks(regions);
}
}else if(partitioner!=null){
job.setPartitionerClass(partitioner);
}
if(addDependencyJars){
addDependencyJars(job);
}
//initCredentials(job);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The Splice output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in <code>hbase-site.xml</code>.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by <code>hbase-site.xml</code> and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.client
* @throws IOException When determining the region count fails.
* @throws SQLException
*/
public static void initTableReducerJob(String table,
Class<? extends Reducer> reducer,Job job,
Class partitioner,String quorumAddress,String serverClass,
String serverImpl) throws IOException, SQLException{
initTableReducerJob(table,reducer,job,partitioner,quorumAddress,
serverClass,serverImpl,true,TableOutputFormat.class);
}