下面列出了怎么用org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* e.g.</br>
*
* <pre>
* yarn jar super-devops-tool-hbase-migrator-master.jar \
* com.wl4g.devops.tool.hbase.migrator.HfileBulkImporter \
* -z emr-header-1:2181 \
* -t safeclound.tb_elec_power \
* -p /tmp-devops/safeclound.tb_elec_power
* </pre>
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HbaseMigrateUtils.showBanner();
CommandLine line = new Builder().option("z", "zkaddr", null, "Zookeeper address.")
.option("t", "tabname", null, "Hbase table name.")
.option("p", "path", null, "Data hdfs path to be import. e.g. hdfs://localhost:9000/bak/safeclound.tb_air")
.build(args);
Configuration cfg = HBaseConfiguration.create();
cfg.set("hbase.zookeeper.quorum", line.getOptionValue("z"));
Connection conn = ConnectionFactory.createConnection(cfg);
Admin admin = conn.getAdmin();
Table table = conn.getTable(TableName.valueOf(line.getOptionValue("t")));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(cfg);
load.doBulkLoad(new Path(line.getOptionValue("p")), admin, table,
conn.getRegionLocator(TableName.valueOf(line.getOptionValue("t"))));
}
public void bulkLoadTable(String tableName) throws Exception {
Path rootPathOfTable = new Path(getRootDirOfHTable(tableName));
FileStatus[] regionFiles = hbaseFS.listStatus(rootPathOfTable, new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().startsWith(".");
}
});
for (FileStatus regionFileStatus : regionFiles) {
ToolRunner.run(new LoadIncrementalHFiles(hbaseConf),
new String[] { regionFileStatus.getPath().toString(), tableName });
}
logger.info("succeed to migrate htable {}", tableName);
}
/**
* Perform the loading of Hfiles.
*/
@Override
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
// Load generated HFiles into table
try {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
job.getConfiguration());
loader.doBulkLoad(bulkLoadDir, hTable);
}
catch (Exception e) {
String errorMessage = String.format("Unrecoverable error while " +
"performing the bulk load of files in [%s]",
bulkLoadDir.toString());
throw new ImportException(errorMessage, e);
}
}
/**
* Submits the job and waits for completion.
* @param job job
* @param outputPath output path
* @throws Exception
*/
private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars) throws Exception {
job.setMapperClass(getBulkMapperClass());
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
final Configuration configuration = job.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(configuration);
Admin admin = conn.getAdmin();
Table table = conn.getTable(outputTableName);
RegionLocator regionLocator = conn.getRegionLocator(outputTableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
if (skipDependencyJars) {
job.getConfiguration().unset("tmpjars");
}
boolean status = job.waitForCompletion(true);
if (!status) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
loader.doBulkLoad(outputPath, admin, table, regionLocator);
}
FileSystem.get(configuration).delete(outputPath, true);
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_II_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
String input = getOptionValue(OPTION_INPUT_PATH);
String iiname = getOptionValue(OPTION_II_NAME);
FileSystem fs = FileSystem.get(getConf());
FsPermission permission = new FsPermission((short) 0777);
fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);
int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
IIInstance ii = mgr.getII(iiname);
IISegment seg = ii.getFirstSegment();
seg.setStorageLocationIdentifier(tableName);
seg.setStatus(SegmentStatusEnum.READY);
mgr.updateII(ii);
return hbaseExitCode;
} catch (Exception e) {
printUsage(options);
throw e;
}
}
public void completeImport() throws Exception {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConfiguration());
HTable table = new HTable(getConfiguration(), _tableName);
loader.doBulkLoad(_hfilePath, table);
FileSystem fs = _hfilePath.getFileSystem(getConfiguration());
fs.delete(_hfilePath, true);
}
public static void bulkLoad(Configuration conf, LoadIncrementalHFiles loader,
Path path, String fullTableName) throws IOException {
SConfiguration configuration = HConfiguration.getConfiguration();
org.apache.hadoop.hbase.client.Connection conn = HBaseConnectionFactory.getInstance(configuration).getConnection();
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
TableName tableName = TableName.valueOf(fullTableName);
RegionLocator locator = conn.getRegionLocator(tableName);
Table table = conn.getTable(tableName);
loader.doBulkLoad(path, admin, table, locator);
}
@Override
public void call(Iterator<BulkImportPartition> importPartitions) throws Exception {
init(importPartitions);
Configuration conf = HConfiguration.unwrapDelegate();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
FileSystem fs = FileSystem.get(URI.create(bulkImportDirectory), conf);
PartitionFactory tableFactory= SIDriver.driver().getTableFactory();
for (Long conglomId : partitionMap.keySet()) {
Partition partition=tableFactory.getTable(Long.toString(conglomId));
List<BulkImportPartition> partitionList = partitionMap.get(conglomId);
// For each batch of BulkImportPartition, use the first partition as staging area
Path path = new Path(partitionList.get(0).getFilePath());
if (!fs.exists(path)) {
fs.mkdirs(path);
}
// Move files from all partitions to the first partition
for (int i = 1; i < partitionList.size(); ++i) {
Path sourceDir = new Path(partitionList.get(i).getFilePath());
if (fs.exists(sourceDir)) {
FileStatus[] statuses = fs.listStatus(sourceDir);
for (FileStatus status : statuses) {
Path filePath = status.getPath();
Path destPath = new Path(path, filePath.getName());
fs.rename(filePath, destPath);
if (LOG.isDebugEnabled()) {
SpliceLogUtils.debug(LOG, "Move file %s to %s", filePath.toString(), destPath.toString());
}
}
fs.delete(sourceDir.getParent(), true);
}
}
writeToken(fs, path);
HBasePlatformUtils.bulkLoad(conf, loader, path.getParent(), "splice:" + partition.getTableName());
fs.delete(path.getParent(), true);
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: " + input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
int count = 0;
Path inputPath = new Path(input);
FileSystem fs = HadoopUtil.getFileSystem(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
logger.info("Delete temporary path: " + path);
fs.delete(path, true);
} else {
count++;
}
}
}
int ret = 0;
if (count > 0) {
logger.debug("Start to run LoadIncrementalHFiles");
ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
} else {
logger.debug("Nothing to load, cube is empty");
return ret;
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_TABLE_NAME);
String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);
ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
long srcTableRowCnt = Long.parseLong(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
snapshot.setRowCnt(srcTableRowCnt);
snapshot.setLastBuildTime(System.currentTimeMillis());
extTableSnapshotInfoManager.updateSnapshot(snapshot);
String hTableName = snapshot.getStorageLocationIdentifier();
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: {}", input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = hTableName;
logger.debug("Start to run LoadIncrementalHFiles");
int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
}
@Override
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('t');
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardDelete " + source);
if (cmd.hasOption('s')) {
job.getConfiguration().set(SUBJECT, cmd.getOptionValue('s'));
}
if (cmd.hasOption('p')) {
job.getConfiguration().set(PREDICATE, cmd.getOptionValue('p'));
}
if (cmd.hasOption('o')) {
job.getConfiguration().set(OBJECT, cmd.getOptionValue('o'));
}
if (cmd.hasOption('g')) {
job.getConfiguration().setStrings(CONTEXTS, cmd.getOptionValues('g'));
}
job.setJarByClass(HalyardBulkDelete.class);
TableMapReduceUtil.initCredentials(job);
Scan scan = HalyardTableUtils.scan(null, null);
TableMapReduceUtil.initTableMapperJob(source,
scan,
DeleteMapper.class,
ImmutableBytesWritable.class,
LongWritable.class,
job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue('f')));
TableMapReduceUtil.addDependencyJars(job);
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(cmd.getOptionValue('f')), hTable);
LOG.info("Bulk Delete Completed..");
return 0;
}
}
return -1;
}
@Override
protected int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String workdir = cmd.getOptionValue('w');
String target = cmd.getOptionValue('t');
getConf().setBoolean(SKIP_INVALID_PROPERTY, cmd.hasOption('i'));
getConf().setBoolean(VERIFY_DATATYPE_VALUES_PROPERTY, cmd.hasOption('d'));
getConf().setBoolean(TRUNCATE_PROPERTY, cmd.hasOption('r'));
getConf().setInt(SPLIT_BITS_PROPERTY, Integer.parseInt(cmd.getOptionValue('b', "3")));
if (cmd.hasOption('g')) getConf().set(DEFAULT_CONTEXT_PROPERTY, cmd.getOptionValue('g'));
getConf().setBoolean(OVERRIDE_CONTEXT_PROPERTY, cmd.hasOption('o'));
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('m')) getConf().setLong("mapreduce.input.fileinputformat.split.maxsize", Long.parseLong(cmd.getOptionValue('m')));
TableMapReduceUtil.addDependencyJars(getConf(),
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardBulkLoad -> " + workdir + " -> " + target);
job.setJarByClass(HalyardBulkLoad.class);
job.setMapperClass(RDFMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(RioFileInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), target, true, getConf().getInt(SPLIT_BITS_PROPERTY, 3))) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.setInputPaths(job, source);
FileOutputFormat.setOutputPath(job, new Path(workdir));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (job.waitForCompletion(true)) {
if (getConf().getBoolean(TRUNCATE_PROPERTY, false)) {
HalyardTableUtils.truncateTable(hTable).close();
}
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(workdir), hTable);
LOG.info("Bulk Load Completed..");
return 0;
}
}
return -1;
}
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String queryFiles = cmd.getOptionValue('q');
String workdir = cmd.getOptionValue('w');
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('i')) getConf().set(ELASTIC_INDEX_URL, cmd.getOptionValue('i'));
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
getConf().setStrings(TABLE_NAME_PROPERTY, source);
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis()));
int stages = 1;
for (int stage = 0; stage < stages; stage++) {
Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + workdir + " -> " + source + " stage #" + stage);
job.getConfiguration().setInt(STAGE_PROPERTY, stage);
job.setJarByClass(HalyardBulkUpdate.class);
job.setMapperClass(SPARQLUpdateMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(QueryInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
QueryInputFormat.setQueriesFromDirRecursive(job.getConfiguration(), queryFiles, true, stage);
Path outPath = new Path(workdir, "stage"+stage);
FileOutputFormat.setOutputPath(job, outPath);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (stage == 0) { //count real number of stages
for (InputSplit is : new QueryInputFormat().getSplits(job)) {
QueryInputFormat.QueryInputSplit qis = (QueryInputFormat.QueryInputSplit)is;
int updates = QueryParserUtil.parseUpdate(QueryLanguage.SPARQL, qis.getQuery(), null).getUpdateExprs().size();
if (updates > stages) {
stages = updates;
}
LOG.log(Level.INFO, "{0} contains {1} stages of the update sequence.", new Object[]{qis.getQueryName(), updates});
}
LOG.log(Level.INFO, "Bulk Update will process {0} MapReduce stages.", stages);
}
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(outPath, hTable);
LOG.log(Level.INFO, "Stage #{0} of {1} completed..", new Object[]{stage, stages});
} else {
return -1;
}
}
}
LOG.info("Bulk Update Completed..");
return 0;
}
@Override
public Boolean call() {
LOG.info("Configuring HFile output path to {}", outputPath);
try{
Job job = new Job(conf, "Phoenix MapReduce import for " + tableName);
// Allow overriding the job jar setting by using a -D system property at startup
if (job.getJar() == null) {
job.setJarByClass(CsvToKeyValueMapper.class);
}
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(CsvToKeyValueMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// initialize credentials to possibily run in a secure env
TableMapReduceUtil.initCredentials(job);
HTable htable = new HTable(conf, tableName);
// Auto configure partitioner and reducer according to the Main Data table
HFileOutputFormat.configureIncrementalLoad(job, htable);
LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Import job failed, check JobTracker for details");
htable.close();
return false;
}
LOG.info("Loading HFiles from {}", outputPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outputPath, htable);
htable.close();
LOG.info("Incremental load complete for table=" + tableName);
LOG.info("Removing output directory {}", outputPath);
if (!FileSystem.get(conf).delete(outputPath, true)) {
LOG.error("Removing output directory {} failed", outputPath);
}
return true;
} catch(Exception ex) {
LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
return false;
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: " + input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
int count = 0;
Path inputPath = new Path(input);
FileSystem fs = HadoopUtil.getFileSystem(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
logger.info("Delete temporary path: " + path);
fs.delete(path, true);
} else {
count++;
}
}
}
int ret = 0;
if (count > 0) {
logger.debug("Start to run LoadIncrementalHFiles");
ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
} else {
logger.debug("Nothing to load, cube is empty");
return ret;
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_TABLE_NAME);
String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);
ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
long srcTableRowCnt = Long.parseLong(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
snapshot.setRowCnt(srcTableRowCnt);
snapshot.setLastBuildTime(System.currentTimeMillis());
extTableSnapshotInfoManager.updateSnapshot(snapshot);
String hTableName = snapshot.getStorageLocationIdentifier();
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: {}", input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = hTableName;
logger.debug("Start to run LoadIncrementalHFiles");
int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConfiguration.create(getConf());
FileSystem fs = FileSystem.get(conf);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
KylinConfig config = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
FsPermission permission = new FsPermission((short) 0777);
for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
String cfName = cf.getName();
fs.setPermission(new Path(input + cfName), permission);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
log.debug("Start to run LoadIncrementalHFiles");
int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
log.debug("End to run LoadIncrementalHFiles");
return ret;
} catch (Exception e) {
printUsage(options);
throw e;
}
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {tableName} {columnFamily} {runID}");
return;
}
String numberOfMappers = args[0];
String numberOfRecords = args[1];
String outputPath = args[2];
String tableName = args[3];
String columnFamily = args[4];
String runID = args[5];
// Create job
Job job = Job.getInstance();
HBaseConfiguration.addHbaseResources(job.getConfiguration());
job.setJarByClass(PopulateTable.class);
job.setJobName("PopulateTable: " + runID);
job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords);
job.getConfiguration().set(TABLE_NAME, tableName);
job.getConfiguration().set(COLUMN_FAMILY, columnFamily);
job.getConfiguration().set(RUN_ID, runID);
// Define input format and path
job.setInputFormatClass(NMapInputFormat.class);
NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers));
Configuration config = HBaseConfiguration.create();
HTable hTable = new HTable(config, tableName);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// Define the mapper and reducer
job.setMapperClass(CustomMapper.class);
// job.setReducerClass(CustomReducer.class);
// Define the key and value format
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
// Exit
job.waitForCompletion(true);
FileSystem hdfs = FileSystem.get(config);
// Must all HBase to have write access to HFiles
HFileUtils.changePermissionR(outputPath, hdfs);
LoadIncrementalHFiles load = new LoadIncrementalHFiles(config);
load.doBulkLoad(new Path(outputPath), hTable);
}