下面列出了org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles#doBulkLoad ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* e.g.</br>
*
* <pre>
* yarn jar super-devops-tool-hbase-migrator-master.jar \
* com.wl4g.devops.tool.hbase.migrator.HfileBulkImporter \
* -z emr-header-1:2181 \
* -t safeclound.tb_elec_power \
* -p /tmp-devops/safeclound.tb_elec_power
* </pre>
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HbaseMigrateUtils.showBanner();
CommandLine line = new Builder().option("z", "zkaddr", null, "Zookeeper address.")
.option("t", "tabname", null, "Hbase table name.")
.option("p", "path", null, "Data hdfs path to be import. e.g. hdfs://localhost:9000/bak/safeclound.tb_air")
.build(args);
Configuration cfg = HBaseConfiguration.create();
cfg.set("hbase.zookeeper.quorum", line.getOptionValue("z"));
Connection conn = ConnectionFactory.createConnection(cfg);
Admin admin = conn.getAdmin();
Table table = conn.getTable(TableName.valueOf(line.getOptionValue("t")));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(cfg);
load.doBulkLoad(new Path(line.getOptionValue("p")), admin, table,
conn.getRegionLocator(TableName.valueOf(line.getOptionValue("t"))));
}
/**
* Perform the loading of Hfiles.
*/
@Override
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
// Load generated HFiles into table
try {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
job.getConfiguration());
loader.doBulkLoad(bulkLoadDir, hTable);
}
catch (Exception e) {
String errorMessage = String.format("Unrecoverable error while " +
"performing the bulk load of files in [%s]",
bulkLoadDir.toString());
throw new ImportException(errorMessage, e);
}
}
/**
* Submits the job and waits for completion.
* @param job job
* @param outputPath output path
* @throws Exception
*/
private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars) throws Exception {
job.setMapperClass(getBulkMapperClass());
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
final Configuration configuration = job.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(configuration);
Admin admin = conn.getAdmin();
Table table = conn.getTable(outputTableName);
RegionLocator regionLocator = conn.getRegionLocator(outputTableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
if (skipDependencyJars) {
job.getConfiguration().unset("tmpjars");
}
boolean status = job.waitForCompletion(true);
if (!status) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
loader.doBulkLoad(outputPath, admin, table, regionLocator);
}
FileSystem.get(configuration).delete(outputPath, true);
}
public void completeImport() throws Exception {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConfiguration());
HTable table = new HTable(getConfiguration(), _tableName);
loader.doBulkLoad(_hfilePath, table);
FileSystem fs = _hfilePath.getFileSystem(getConfiguration());
fs.delete(_hfilePath, true);
}
public static void bulkLoad(Configuration conf, LoadIncrementalHFiles loader,
Path path, String fullTableName) throws IOException {
SConfiguration configuration = HConfiguration.getConfiguration();
org.apache.hadoop.hbase.client.Connection conn = HBaseConnectionFactory.getInstance(configuration).getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
TableName tableName = TableName.valueOf(fullTableName);
RegionLocator locator = conn.getRegionLocator(tableName);
Table table = conn.getTable(tableName);
loader.doBulkLoad(path, admin, table, locator);
}
@Override
public Boolean call() {
LOG.info("Configuring HFile output path to {}", outputPath);
try{
Job job = new Job(conf, "Phoenix MapReduce import for " + tableName);
// Allow overriding the job jar setting by using a -D system property at startup
if (job.getJar() == null) {
job.setJarByClass(CsvToKeyValueMapper.class);
}
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(CsvToKeyValueMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// initialize credentials to possibily run in a secure env
TableMapReduceUtil.initCredentials(job);
HTable htable = new HTable(conf, tableName);
// Auto configure partitioner and reducer according to the Main Data table
HFileOutputFormat.configureIncrementalLoad(job, htable);
LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Import job failed, check JobTracker for details");
htable.close();
return false;
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, htable);
htable.close();
LOG.info("Incremental load complete for table=" + tableName);
LOG.info("Removing output directory {}", outputPath);
if (!FileSystem.get(conf).delete(outputPath, true)) {
LOG.error("Removing output directory {} failed", outputPath);
}
return true;
} catch(Exception ex) {
LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
return false;
}
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {tableName} {columnFamily} {runID}");
return;
}
String numberOfMappers = args[0];
String numberOfRecords = args[1];
String outputPath = args[2];
String tableName = args[3];
String columnFamily = args[4];
String runID = args[5];
// Create job
Job job = Job.getInstance();
HBaseConfiguration.addHbaseResources(job.getConfiguration());
job.setJarByClass(PopulateTable.class);
job.setJobName("PopulateTable: " + runID);
job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords);
job.getConfiguration().set(TABLE_NAME, tableName);
job.getConfiguration().set(COLUMN_FAMILY, columnFamily);
job.getConfiguration().set(RUN_ID, runID);
// Define input format and path
job.setInputFormatClass(NMapInputFormat.class);
NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers));
Configuration config = HBaseConfiguration.create();
HTable hTable = new HTable(config, tableName);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// Define the mapper and reducer
job.setMapperClass(CustomMapper.class);
// job.setReducerClass(CustomReducer.class);
// Define the key and value format
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// Exit
job.waitForCompletion(true);
FileSystem hdfs = FileSystem.get(config);
// Must all HBase to have write access to HFiles
HFileUtils.changePermissionR(outputPath, hdfs);
LoadIncrementalHFiles load = new LoadIncrementalHFiles(config);
load.doBulkLoad(new Path(outputPath), hTable);
}