下面列出了怎么用org.apache.hadoop.hbase.mapreduce.HFileOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void jobSetup(Job job) throws IOException, ImportException {
super.jobSetup(job);
// we shouldn't have gotten here if bulk load dir is not set
// so let's throw a ImportException
if(getContext().getDestination() == null){
throw new ImportException("Can't run HBaseBulkImportJob without a " +
"valid destination directory.");
}
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
FileOutputFormat.setOutputPath(job, getContext().getDestination());
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
HFileOutputFormat.configureIncrementalLoad(job, hTable);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.hadoopConfig = getHadoopConfig(this.config);
/**
* PLASE NOTE:
* If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
* close the pact-hbase project OR set the maven profile to hadoop_yarn
*
* pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
* it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
*/
final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
final HFileOutputFormat outFormat = new HFileOutputFormat();
try {
this.writer = outFormat.getRecordWriter(this.context);
} catch (InterruptedException iex) {
throw new IOException("Opening the writer was interrupted.", iex);
}
}
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_II_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
FileOutputFormat.setOutputPath(job, output);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(IICreateHFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
HFileOutputFormat.configureIncrementalLoad(job, htable);
this.deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
printUsage(options);
throw e;
}
}
@Override
public Boolean call() {
LOG.info("Configuring HFile output path to {}", outputPath);
try{
Job job = new Job(conf, "Phoenix MapReduce import for " + tableName);
// Allow overriding the job jar setting by using a -D system property at startup
if (job.getJar() == null) {
job.setJarByClass(CsvToKeyValueMapper.class);
}
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(CsvToKeyValueMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// initialize credentials to possibily run in a secure env
TableMapReduceUtil.initCredentials(job);
HTable htable = new HTable(conf, tableName);
// Auto configure partitioner and reducer according to the Main Data table
HFileOutputFormat.configureIncrementalLoad(job, htable);
LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Import job failed, check JobTracker for details");
htable.close();
return false;
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, htable);
htable.close();
LOG.info("Incremental load complete for table=" + tableName);
LOG.info("Removing output directory {}", outputPath);
if (!FileSystem.get(conf).delete(outputPath, true)) {
LOG.error("Removing output directory {} failed", outputPath);
}
return true;
} catch(Exception ex) {
LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
return false;
}
}
/**
* Sets up the actual job.
*
* @param conf
* The current configuration.
* @param args
* The command line parameters.
* @return The newly created job.
* @throws IOException
* When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
if (actualSeparator != null) {
conf.set(SEPARATOR_CONF_KEY,
new String(Base64.encodeBytes(actualSeparator.getBytes())));
}
// See if a non-default Mapper was set
String mapperClassName = conf.get(MAPPER_CONF_KEY);
Class mapperClass = mapperClassName != null ? Class
.forName(mapperClassName) : DEFAULT_MAPPER;
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
String inputCodec = conf.get(INPUT_LZO_KEY);
if (inputCodec == null) {
FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
// size =
// 64m
job.setInputFormatClass(TextInputFormat.class);
} else {
if (inputCodec.equalsIgnoreCase("lzo"))
job.setInputFormatClass(LzoTextInputFormat.class);
else {
usage("not supported compression codec!");
System.exit(-1);
}
}
job.setMapperClass(mapperClass);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
// No reducers. Just write straight to table. Call
// initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /*
* Guava used by TsvParser
*/);
return job;
}
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
setJobClasspath(job);
addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
FileOutputFormat.setOutputPath(job, output);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(CubeHFileMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
Configuration conf = HBaseConfiguration.create(getConf());
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
HTable htable = new HTable(conf, tableName);
//Automatic config !
HFileOutputFormat.configureIncrementalLoad(job, htable);
// set block replication to 3 for hfiles
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
this.deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in CubeHFileJob", e);
printUsage(options);
throw e;
}
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {tableName} {columnFamily} {runID}");
return;
}
String numberOfMappers = args[0];
String numberOfRecords = args[1];
String outputPath = args[2];
String tableName = args[3];
String columnFamily = args[4];
String runID = args[5];
// Create job
Job job = Job.getInstance();
HBaseConfiguration.addHbaseResources(job.getConfiguration());
job.setJarByClass(PopulateTable.class);
job.setJobName("PopulateTable: " + runID);
job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords);
job.getConfiguration().set(TABLE_NAME, tableName);
job.getConfiguration().set(COLUMN_FAMILY, columnFamily);
job.getConfiguration().set(RUN_ID, runID);
// Define input format and path
job.setInputFormatClass(NMapInputFormat.class);
NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers));
Configuration config = HBaseConfiguration.create();
HTable hTable = new HTable(config, tableName);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// Define the mapper and reducer
job.setMapperClass(CustomMapper.class);
// job.setReducerClass(CustomReducer.class);
// Define the key and value format
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// Exit
job.waitForCompletion(true);
FileSystem hdfs = FileSystem.get(config);
// Must all HBase to have write access to HFiles
HFileUtils.changePermissionR(outputPath, hdfs);
LoadIncrementalHFiles load = new LoadIncrementalHFiles(config);
load.doBulkLoad(new Path(outputPath), hTable);
}
private void configureBulkImport() throws IOException {
HTable table = new HTable(getConfiguration(), _tableName);
HFileOutputFormat.configureIncrementalLoad(_job, table);
HFileOutputFormat.setOutputPath(_job, _hfilePath);
}