类org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: super-cloudops   文件: HfileBulkImporter.java
/**
 * e.g.</br>
 * 
 * <pre>
 *  yarn jar super-devops-tool-hbase-migrator-master.jar \
 *  com.wl4g.devops.tool.hbase.migrator.HfileBulkImporter \
 *  -z emr-header-1:2181 \
 *  -t safeclound.tb_elec_power \
 *  -p /tmp-devops/safeclound.tb_elec_power
 * </pre>
 * 
 * @param args
 * @throws Exception
 */
public static void main(String[] args) throws Exception {
	HbaseMigrateUtils.showBanner();

	CommandLine line = new Builder().option("z", "zkaddr", null, "Zookeeper address.")
			.option("t", "tabname", null, "Hbase table name.")
			.option("p", "path", null, "Data hdfs path to be import. e.g. hdfs://localhost:9000/bak/safeclound.tb_air")
			.build(args);

	Configuration cfg = HBaseConfiguration.create();
	cfg.set("hbase.zookeeper.quorum", line.getOptionValue("z"));
	Connection conn = ConnectionFactory.createConnection(cfg);
	Admin admin = conn.getAdmin();
	Table table = conn.getTable(TableName.valueOf(line.getOptionValue("t")));
	LoadIncrementalHFiles load = new LoadIncrementalHFiles(cfg);
	load.doBulkLoad(new Path(line.getOptionValue("p")), admin, table,
			conn.getRegionLocator(TableName.valueOf(line.getOptionValue("t"))));
}
 
源代码2 项目: kylin   文件: DstClusterUtil.java
public void bulkLoadTable(String tableName) throws Exception {
    Path rootPathOfTable = new Path(getRootDirOfHTable(tableName));
    FileStatus[] regionFiles = hbaseFS.listStatus(rootPathOfTable, new PathFilter() {
        @Override
        public boolean accept(Path path) {
            return !path.getName().startsWith(".");
        }
    });

    for (FileStatus regionFileStatus : regionFiles) {
        ToolRunner.run(new LoadIncrementalHFiles(hbaseConf),
                new String[] { regionFileStatus.getPath().toString(), tableName });
    }

    logger.info("succeed to migrate htable {}", tableName);
}
 
/**
 * Perform the loading of Hfiles.
 */
@Override
protected void completeImport(Job job) throws IOException, ImportException {
  super.completeImport(job);

  FileSystem fileSystem = FileSystem.get(job.getConfiguration());

  // Make the bulk load files source directory accessible to the world
  // so that the hbase user can deal with it
  Path bulkLoadDir = getContext().getDestination();
  setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
    FsPermission.createImmutable((short) 00777));

  HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());

  // Load generated HFiles into table
  try {
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
      job.getConfiguration());
    loader.doBulkLoad(bulkLoadDir, hTable);
  }
  catch (Exception e) {
    String errorMessage = String.format("Unrecoverable error while " +
      "performing the bulk load of files in [%s]",
      bulkLoadDir.toString());
    throw new ImportException(errorMessage, e);
  }
}
 
源代码4 项目: hgraphdb   文件: IndexTool.java
/**
 * Submits the job and waits for completion.
 * @param job job
 * @param outputPath output path
 * @throws Exception
 */
private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, TableName outputTableName,
                                               boolean skipDependencyJars) throws Exception {
    job.setMapperClass(getBulkMapperClass());
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    final Configuration configuration = job.getConfiguration();
    try (Connection conn = ConnectionFactory.createConnection(configuration);
         Admin admin = conn.getAdmin();
         Table table = conn.getTable(outputTableName);
         RegionLocator regionLocator = conn.getRegionLocator(outputTableName)) {
        HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
        if (skipDependencyJars) {
            job.getConfiguration().unset("tmpjars");
        }
        boolean status = job.waitForCompletion(true);
        if (!status) {
            LOG.error("IndexTool job failed!");
            throw new Exception("IndexTool job failed: " + job.toString());
        }

        LOG.info("Loading HFiles from {}", outputPath);
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
        loader.doBulkLoad(outputPath, admin, table, regionLocator);
    }
    FileSystem.get(configuration).delete(outputPath, true);
}
 
源代码5 项目: Kylin   文件: IIBulkLoadJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    try {
        options.addOption(OPTION_INPUT_PATH);
        options.addOption(OPTION_HTABLE_NAME);
        options.addOption(OPTION_II_NAME);
        parseOptions(options, args);

        String tableName = getOptionValue(OPTION_HTABLE_NAME);
        String input = getOptionValue(OPTION_INPUT_PATH);
        String iiname = getOptionValue(OPTION_II_NAME);

        FileSystem fs = FileSystem.get(getConf());
        FsPermission permission = new FsPermission((short) 0777);
        fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);

        int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });

        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
        IIInstance ii = mgr.getII(iiname);
        IISegment seg = ii.getFirstSegment();
        seg.setStorageLocationIdentifier(tableName);
        seg.setStatus(SegmentStatusEnum.READY);
        mgr.updateII(ii);

        return hbaseExitCode;

    } catch (Exception e) {
        printUsage(options);
        throw e;
    }
}
 
源代码6 项目: zerowing   文件: BulkImportJob.java
public void completeImport() throws Exception {
  LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConfiguration());
  HTable table = new HTable(getConfiguration(), _tableName);
  loader.doBulkLoad(_hfilePath, table);

  FileSystem fs = _hfilePath.getFileSystem(getConfiguration());
  fs.delete(_hfilePath, true);
}
 
源代码7 项目: spliceengine   文件: HBasePlatformUtils.java
public static void bulkLoad(Configuration conf, LoadIncrementalHFiles loader,
                            Path path, String fullTableName) throws IOException {
    SConfiguration configuration = HConfiguration.getConfiguration();
    org.apache.hadoop.hbase.client.Connection conn = HBaseConnectionFactory.getInstance(configuration).getConnection();
    HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
    TableName tableName = TableName.valueOf(fullTableName);
    RegionLocator locator = conn.getRegionLocator(tableName);
    Table table = conn.getTable(tableName);
    loader.doBulkLoad(path, admin, table, locator);
}
 
源代码8 项目: spliceengine   文件: BulkImportFunction.java
@Override
public void call(Iterator<BulkImportPartition> importPartitions) throws Exception {

    init(importPartitions);
    Configuration conf = HConfiguration.unwrapDelegate();
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    FileSystem fs = FileSystem.get(URI.create(bulkImportDirectory), conf);
    PartitionFactory tableFactory= SIDriver.driver().getTableFactory();

    for (Long conglomId : partitionMap.keySet()) {
        Partition partition=tableFactory.getTable(Long.toString(conglomId));
        List<BulkImportPartition> partitionList = partitionMap.get(conglomId);
        // For each batch of BulkImportPartition, use the first partition as staging area
        Path path = new Path(partitionList.get(0).getFilePath());
        if (!fs.exists(path)) {
            fs.mkdirs(path);
        }

        // Move files from all partitions to the first partition
        for (int i = 1; i < partitionList.size(); ++i) {
            Path sourceDir = new Path(partitionList.get(i).getFilePath());
            if (fs.exists(sourceDir)) {
                FileStatus[] statuses = fs.listStatus(sourceDir);
                for (FileStatus status : statuses) {
                    Path filePath = status.getPath();
                    Path destPath = new Path(path, filePath.getName());
                    fs.rename(filePath, destPath);
                    if (LOG.isDebugEnabled()) {
                        SpliceLogUtils.debug(LOG, "Move file %s to %s", filePath.toString(), destPath.toString());
                    }
                }
                fs.delete(sourceDir.getParent(), true);
            }
        }
        writeToken(fs, path);
        HBasePlatformUtils.bulkLoad(conf, loader, path.getParent(), "splice:" + partition.getTableName());
        fs.delete(path.getParent(), true);
    }
}
 
源代码9 项目: kylin-on-parquet-v2   文件: BulkLoadJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    options.addOption(OPTION_INPUT_PATH);
    options.addOption(OPTION_HTABLE_NAME);
    options.addOption(OPTION_CUBE_NAME);
    parseOptions(options, args);

    String tableName = getOptionValue(OPTION_HTABLE_NAME);
    // e.g
    // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
    // end with "/"
    String input = getOptionValue(OPTION_INPUT_PATH);

    Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
    FsShell shell = new FsShell(conf);

    int exitCode = -1;
    int retryCount = 10;
    while (exitCode != 0 && retryCount >= 1) {
        exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
        retryCount--;
        Thread.sleep(5000);
    }

    if (exitCode != 0) {
        logger.error("Failed to change the file permissions: " + input);
        throw new IOException("Failed to change the file permissions: " + input);
    }

    String[] newArgs = new String[2];
    newArgs[0] = input;
    newArgs[1] = tableName;

    int count = 0;
    Path inputPath = new Path(input);
    FileSystem fs = HadoopUtil.getFileSystem(inputPath);
    FileStatus[] fileStatuses = fs.listStatus(inputPath);

    for (FileStatus fileStatus : fileStatuses) {
        if (fileStatus.isDirectory()) {
            Path path = fileStatus.getPath();
            if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
                logger.info("Delete temporary path: " + path);
                fs.delete(path, true);
            } else {
                count++;
            }
        }
    }

    int ret = 0;
    if (count > 0) {
        logger.debug("Start to run LoadIncrementalHFiles");
        ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
        logger.debug("End to run LoadIncrementalHFiles");
        return ret;
    } else {
        logger.debug("Nothing to load, cube is empty");
        return ret;
    }
}
 
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    options.addOption(OPTION_INPUT_PATH);
    options.addOption(OPTION_TABLE_NAME);
    options.addOption(OPTION_CUBING_JOB_ID);
    options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
    parseOptions(options, args);

    String tableName = getOptionValue(OPTION_TABLE_NAME);
    String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
    String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);

    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
    DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);

    ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
    ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
    long srcTableRowCnt = Long.parseLong(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
    logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
    snapshot.setRowCnt(srcTableRowCnt);
    snapshot.setLastBuildTime(System.currentTimeMillis());
    extTableSnapshotInfoManager.updateSnapshot(snapshot);

    String hTableName = snapshot.getStorageLocationIdentifier();
    // e.g
    // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
    // end with "/"
    String input = getOptionValue(OPTION_INPUT_PATH);

    Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
    FsShell shell = new FsShell(conf);

    int exitCode = -1;
    int retryCount = 10;
    while (exitCode != 0 && retryCount >= 1) {
        exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
        retryCount--;
        Thread.sleep(5000);
    }

    if (exitCode != 0) {
        logger.error("Failed to change the file permissions: {}", input);
        throw new IOException("Failed to change the file permissions: " + input);
    }

    String[] newArgs = new String[2];
    newArgs[0] = input;
    newArgs[1] = hTableName;

    logger.debug("Start to run LoadIncrementalHFiles");
    int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
    logger.debug("End to run LoadIncrementalHFiles");
    return ret;
}
 
源代码11 项目: Halyard   文件: HalyardBulkDelete.java
@Override
public int run(CommandLine cmd) throws Exception {
    String source = cmd.getOptionValue('t');
    TableMapReduceUtil.addDependencyJars(getConf(),
        HalyardExport.class,
        NTriplesUtil.class,
        Rio.class,
        AbstractRDFHandler.class,
        RDFFormat.class,
        RDFParser.class,
        HTable.class,
        HBaseConfiguration.class,
        AuthenticationProtos.class,
        Trace.class,
        Gauge.class);
    HBaseConfiguration.addHbaseResources(getConf());
    Job job = Job.getInstance(getConf(), "HalyardDelete " + source);
    if (cmd.hasOption('s')) {
        job.getConfiguration().set(SUBJECT, cmd.getOptionValue('s'));
    }
    if (cmd.hasOption('p')) {
        job.getConfiguration().set(PREDICATE, cmd.getOptionValue('p'));
    }
    if (cmd.hasOption('o')) {
        job.getConfiguration().set(OBJECT, cmd.getOptionValue('o'));
    }
    if (cmd.hasOption('g')) {
        job.getConfiguration().setStrings(CONTEXTS, cmd.getOptionValues('g'));
    }
    job.setJarByClass(HalyardBulkDelete.class);
    TableMapReduceUtil.initCredentials(job);

    Scan scan = HalyardTableUtils.scan(null, null);

    TableMapReduceUtil.initTableMapperJob(source,
        scan,
        DeleteMapper.class,
        ImmutableBytesWritable.class,
        LongWritable.class,
        job);

    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    job.setSpeculativeExecution(false);
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);
    try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
        HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
        FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue('f')));
        TableMapReduceUtil.addDependencyJars(job);
        if (job.waitForCompletion(true)) {
            new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(cmd.getOptionValue('f')), hTable);
            LOG.info("Bulk Delete Completed..");
            return 0;
        }
    }
    return -1;
}
 
源代码12 项目: Halyard   文件: HalyardBulkLoad.java
@Override
protected int run(CommandLine cmd) throws Exception {
    String source = cmd.getOptionValue('s');
    String workdir = cmd.getOptionValue('w');
    String target = cmd.getOptionValue('t');
    getConf().setBoolean(SKIP_INVALID_PROPERTY, cmd.hasOption('i'));
    getConf().setBoolean(VERIFY_DATATYPE_VALUES_PROPERTY, cmd.hasOption('d'));
    getConf().setBoolean(TRUNCATE_PROPERTY, cmd.hasOption('r'));
    getConf().setInt(SPLIT_BITS_PROPERTY, Integer.parseInt(cmd.getOptionValue('b', "3")));
    if (cmd.hasOption('g')) getConf().set(DEFAULT_CONTEXT_PROPERTY, cmd.getOptionValue('g'));
    getConf().setBoolean(OVERRIDE_CONTEXT_PROPERTY, cmd.hasOption('o'));
    getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
    if (cmd.hasOption('m')) getConf().setLong("mapreduce.input.fileinputformat.split.maxsize", Long.parseLong(cmd.getOptionValue('m')));
    TableMapReduceUtil.addDependencyJars(getConf(),
            NTriplesUtil.class,
            Rio.class,
            AbstractRDFHandler.class,
            RDFFormat.class,
            RDFParser.class);
    HBaseConfiguration.addHbaseResources(getConf());
    Job job = Job.getInstance(getConf(), "HalyardBulkLoad -> " + workdir + " -> " + target);
    job.setJarByClass(HalyardBulkLoad.class);
    job.setMapperClass(RDFMapper.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    job.setInputFormatClass(RioFileInputFormat.class);
    job.setSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);
    try (HTable hTable = HalyardTableUtils.getTable(getConf(), target, true, getConf().getInt(SPLIT_BITS_PROPERTY, 3))) {
        HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
        FileInputFormat.setInputDirRecursive(job, true);
        FileInputFormat.setInputPaths(job, source);
        FileOutputFormat.setOutputPath(job, new Path(workdir));
        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initCredentials(job);
        if (job.waitForCompletion(true)) {
            if (getConf().getBoolean(TRUNCATE_PROPERTY, false)) {
                HalyardTableUtils.truncateTable(hTable).close();
            }
            new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(workdir), hTable);
            LOG.info("Bulk Load Completed..");
            return 0;
        }
    }
    return -1;
}
 
源代码13 项目: Halyard   文件: HalyardBulkUpdate.java
public int run(CommandLine cmd) throws Exception {
    String source = cmd.getOptionValue('s');
    String queryFiles = cmd.getOptionValue('q');
    String workdir = cmd.getOptionValue('w');
    getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
    if (cmd.hasOption('i')) getConf().set(ELASTIC_INDEX_URL, cmd.getOptionValue('i'));
    TableMapReduceUtil.addDependencyJars(getConf(),
           HalyardExport.class,
           NTriplesUtil.class,
           Rio.class,
           AbstractRDFHandler.class,
           RDFFormat.class,
           RDFParser.class,
           HTable.class,
           HBaseConfiguration.class,
           AuthenticationProtos.class,
           Trace.class,
           Gauge.class);
    HBaseConfiguration.addHbaseResources(getConf());
    getConf().setStrings(TABLE_NAME_PROPERTY, source);
    getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis()));
    int stages = 1;
    for (int stage = 0; stage < stages; stage++) {
        Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + workdir + " -> " + source + " stage #" + stage);
        job.getConfiguration().setInt(STAGE_PROPERTY, stage);
        job.setJarByClass(HalyardBulkUpdate.class);
        job.setMapperClass(SPARQLUpdateMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        job.setInputFormatClass(QueryInputFormat.class);
        job.setSpeculativeExecution(false);
        job.setReduceSpeculativeExecution(false);
        try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
            HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
            QueryInputFormat.setQueriesFromDirRecursive(job.getConfiguration(), queryFiles, true, stage);
            Path outPath = new Path(workdir, "stage"+stage);
            FileOutputFormat.setOutputPath(job, outPath);
            TableMapReduceUtil.addDependencyJars(job);
            TableMapReduceUtil.initCredentials(job);
            if (stage == 0) { //count real number of stages
                for (InputSplit is : new QueryInputFormat().getSplits(job)) {
                    QueryInputFormat.QueryInputSplit qis = (QueryInputFormat.QueryInputSplit)is;
                    int updates = QueryParserUtil.parseUpdate(QueryLanguage.SPARQL, qis.getQuery(), null).getUpdateExprs().size();
                    if (updates > stages) {
                        stages = updates;
                    }
                    LOG.log(Level.INFO, "{0} contains {1} stages of the update sequence.", new Object[]{qis.getQueryName(), updates});
                }
                LOG.log(Level.INFO, "Bulk Update will process {0} MapReduce stages.", stages);
            }
            if (job.waitForCompletion(true)) {
                new LoadIncrementalHFiles(getConf()).doBulkLoad(outPath, hTable);
                LOG.log(Level.INFO, "Stage #{0} of {1} completed..", new Object[]{stage, stages});
            } else {
                return -1;
            }
        }
    }
    LOG.info("Bulk Update Completed..");
    return 0;
}
 
源代码14 项目: phoenix   文件: CsvBulkLoadTool.java
@Override
public Boolean call() {
    LOG.info("Configuring HFile output path to {}", outputPath);
    try{
     Job job = new Job(conf, "Phoenix MapReduce import for " + tableName);
	
     // Allow overriding the job jar setting by using a -D system property at startup
     if (job.getJar() == null) {
         job.setJarByClass(CsvToKeyValueMapper.class);
     }
     job.setInputFormatClass(TextInputFormat.class);
     FileInputFormat.addInputPath(job, inputPath);
     FileOutputFormat.setOutputPath(job, outputPath);
	
     job.setMapperClass(CsvToKeyValueMapper.class);
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
     job.setMapOutputValueClass(KeyValue.class);

     // initialize credentials to possibily run in a secure env
     TableMapReduceUtil.initCredentials(job);

        HTable htable = new HTable(conf, tableName);

     // Auto configure partitioner and reducer according to the Main Data table
     HFileOutputFormat.configureIncrementalLoad(job, htable);
	
     LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
     boolean success = job.waitForCompletion(true);
     if (!success) {
         LOG.error("Import job failed, check JobTracker for details");
         htable.close();
         return false;
     }
	
     LOG.info("Loading HFiles from {}", outputPath);
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
     loader.doBulkLoad(outputPath, htable);
     htable.close();
	
     LOG.info("Incremental load complete for table=" + tableName);
	
     LOG.info("Removing output directory {}", outputPath);
     if (!FileSystem.get(conf).delete(outputPath, true)) {
         LOG.error("Removing output directory {} failed", outputPath);
     }
     
     return true;
    } catch(Exception ex) {
    	LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
    	return false;
    }
}
 
源代码15 项目: kylin   文件: BulkLoadJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    options.addOption(OPTION_INPUT_PATH);
    options.addOption(OPTION_HTABLE_NAME);
    options.addOption(OPTION_CUBE_NAME);
    parseOptions(options, args);

    String tableName = getOptionValue(OPTION_HTABLE_NAME);
    // e.g
    // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
    // end with "/"
    String input = getOptionValue(OPTION_INPUT_PATH);

    Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
    FsShell shell = new FsShell(conf);

    int exitCode = -1;
    int retryCount = 10;
    while (exitCode != 0 && retryCount >= 1) {
        exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
        retryCount--;
        Thread.sleep(5000);
    }

    if (exitCode != 0) {
        logger.error("Failed to change the file permissions: " + input);
        throw new IOException("Failed to change the file permissions: " + input);
    }

    String[] newArgs = new String[2];
    newArgs[0] = input;
    newArgs[1] = tableName;

    int count = 0;
    Path inputPath = new Path(input);
    FileSystem fs = HadoopUtil.getFileSystem(inputPath);
    FileStatus[] fileStatuses = fs.listStatus(inputPath);

    for (FileStatus fileStatus : fileStatuses) {
        if (fileStatus.isDirectory()) {
            Path path = fileStatus.getPath();
            if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
                logger.info("Delete temporary path: " + path);
                fs.delete(path, true);
            } else {
                count++;
            }
        }
    }

    int ret = 0;
    if (count > 0) {
        logger.debug("Start to run LoadIncrementalHFiles");
        ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
        logger.debug("End to run LoadIncrementalHFiles");
        return ret;
    } else {
        logger.debug("Nothing to load, cube is empty");
        return ret;
    }
}
 
源代码16 项目: kylin   文件: LookupTableHFilesBulkLoadJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    options.addOption(OPTION_INPUT_PATH);
    options.addOption(OPTION_TABLE_NAME);
    options.addOption(OPTION_CUBING_JOB_ID);
    options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
    parseOptions(options, args);

    String tableName = getOptionValue(OPTION_TABLE_NAME);
    String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
    String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);

    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
    DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);

    ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
    ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
    long srcTableRowCnt = Long.parseLong(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
    logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
    snapshot.setRowCnt(srcTableRowCnt);
    snapshot.setLastBuildTime(System.currentTimeMillis());
    extTableSnapshotInfoManager.updateSnapshot(snapshot);

    String hTableName = snapshot.getStorageLocationIdentifier();
    // e.g
    // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
    // end with "/"
    String input = getOptionValue(OPTION_INPUT_PATH);

    Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
    FsShell shell = new FsShell(conf);

    int exitCode = -1;
    int retryCount = 10;
    while (exitCode != 0 && retryCount >= 1) {
        exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
        retryCount--;
        Thread.sleep(5000);
    }

    if (exitCode != 0) {
        logger.error("Failed to change the file permissions: {}", input);
        throw new IOException("Failed to change the file permissions: " + input);
    }

    String[] newArgs = new String[2];
    newArgs[0] = input;
    newArgs[1] = hTableName;

    logger.debug("Start to run LoadIncrementalHFiles");
    int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
    logger.debug("End to run LoadIncrementalHFiles");
    return ret;
}
 
源代码17 项目: Kylin   文件: BulkLoadJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    try {
        options.addOption(OPTION_INPUT_PATH);
        options.addOption(OPTION_HTABLE_NAME);
        options.addOption(OPTION_CUBE_NAME);
        parseOptions(options, args);

        String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
        // e.g
        // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
        // end with "/"
        String input = getOptionValue(OPTION_INPUT_PATH);

        Configuration conf = HBaseConfiguration.create(getConf());
        FileSystem fs = FileSystem.get(conf);

        String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        CubeManager cubeMgr = CubeManager.getInstance(config);
        CubeInstance cube = cubeMgr.getCube(cubeName);
        CubeDesc cubeDesc = cube.getDescriptor();
        FsPermission permission = new FsPermission((short) 0777);
        for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
            String cfName = cf.getName();
            fs.setPermission(new Path(input + cfName), permission);
        }

        String[] newArgs = new String[2];
        newArgs[0] = input;
        newArgs[1] = tableName;

        log.debug("Start to run LoadIncrementalHFiles");
        int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
        log.debug("End to run LoadIncrementalHFiles");
        return ret;
    } catch (Exception e) {
        printUsage(options);
        throw e;
    }
}
 
源代码18 项目: HBase-ToHDFS   文件: PopulateTable.java
public static void main(String[] args) throws Exception {

    if (args.length == 0) {
      System.out.println("PopulateSmallTable {numberOfMappers} {numberOfRecords} {tmpOutputPath} {tableName} {columnFamily} {runID}");
      return;
    }

    String numberOfMappers = args[0];
    String numberOfRecords = args[1];
    String outputPath = args[2];
    String tableName = args[3];
    String columnFamily = args[4];
    String runID = args[5];

    // Create job
    Job job = Job.getInstance();
    HBaseConfiguration.addHbaseResources(job.getConfiguration());

    job.setJarByClass(PopulateTable.class);
    job.setJobName("PopulateTable: " + runID);
    job.getConfiguration().set(NUMBER_OF_RECORDS, numberOfRecords);

    job.getConfiguration().set(TABLE_NAME, tableName);
    job.getConfiguration().set(COLUMN_FAMILY, columnFamily);
    job.getConfiguration().set(RUN_ID, runID);

    // Define input format and path
    job.setInputFormatClass(NMapInputFormat.class);
    NMapInputFormat.setNumMapTasks(job.getConfiguration(), Integer.parseInt(numberOfMappers));

    Configuration config = HBaseConfiguration.create();

    HTable hTable = new HTable(config, tableName);

    // Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    // Define the mapper and reducer
    job.setMapperClass(CustomMapper.class);
    // job.setReducerClass(CustomReducer.class);

    // Define the key and value format
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);

    // Exit
    job.waitForCompletion(true);
    FileSystem hdfs = FileSystem.get(config);

    // Must all HBase to have write access to HFiles
    HFileUtils.changePermissionR(outputPath, hdfs);

    LoadIncrementalHFiles load = new LoadIncrementalHFiles(config);
    load.doBulkLoad(new Path(outputPath), hTable);

  }
 
 类所在包
 类方法
 同包方法