下面列出了org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer#org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
//设置reducer配置
//1集群上运行 打成jar运行 (要求addDependencyJars为true(默认true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job,null,
null,null,null,true);
//2本地运行 打成jar运行 (要求addDependencyJars为true(默认true)
// TableMapReduceUtil
// .initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null, null,
// null, false);
//设置输入路径
job.setNumReduceTasks(0);
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
protected Job initJob(Configuration conf) throws IOException {
Job job = Job.getInstance(conf, this.jobName);
job.setJarByClass(this.runnerClass);
// 本地运行
// TableMapReduceUtil.initTableMapperJob(initScans(job), this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass, job, false);
TableMapReduceUtil.initTableMapperJob(initScans(job), this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass, job, true);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job),
// this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass,
// job);
job.setReducerClass(this.reducerClass);
job.setOutputKeyClass(this.outputKeyClass);
job.setOutputValueClass(this.outputValueClass);
job.setOutputFormatClass(this.outputFormatClass);
return job;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(
EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null,
null, null, false);
job.setNumReduceTasks(0);
// 设置输入路径
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
/**
* 创建job
*
* @param conf
* @return
* @throws IOException
*/
protected Job initJob(Configuration conf) throws IOException {
Job job = Job.getInstance(conf, this.jobName);
job.setJarByClass(this.runnerClass);
// 本地运行
TableMapReduceUtil.initTableMapperJob(initScans(job), this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass, job, false);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job),
// this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass,
// job);
job.setReducerClass(this.reducerClass);
job.setOutputKeyClass(this.outputKeyClass);
job.setOutputValueClass(this.outputValueClass);
job.setOutputFormatClass(this.outputFormatClass);
return job;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
conf.set("hbase.zookeeper.quorum","node04,node02,node03");
conf.set("mapreduce.app-submission.cross-platform","true");
conf.set("mapreduce.framework.name","local");
//创建job对象
Job job = Job.getInstance(conf);
job.setJarByClass(WCRunner.class);
//设置mapper类
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce类
// job.setReducerClass();
// TableMapReduceUtil.initTableMapperJob();
TableMapReduceUtil.initTableReducerJob("wc",WCReducer.class,job,null,null,null,null,false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
//指定hdfs存储数据的目录
FileInputFormat.addInputPath(job,new Path("/wc/wc"));
job.waitForCompletion(true);
}
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat3.class);
// Set compression algorithms based on column families
configureCompression(conf, table.getTableDescriptor());
configureBloomType(table.getTableDescriptor(), conf);
configureBlockSize(table.getTableDescriptor(), conf);
HTableDescriptor tableDescriptor = table.getTableDescriptor();
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}
@Override
protected void jobSetup(Job job) throws IOException, ImportException {
super.jobSetup(job);
// we shouldn't have gotten here if bulk load dir is not set
// so let's throw a ImportException
if(getContext().getDestination() == null){
throw new ImportException("Can't run HBaseBulkImportJob without a " +
"valid destination directory.");
}
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
FileOutputFormat.setOutputPath(job, getContext().getDestination());
HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
HFileOutputFormat.configureIncrementalLoad(job, hTable);
}
private void runTestOnTable() throws InterruptedException, ClassNotFoundException {
Job job = null;
try {
Configuration conf = graph.configuration().toHBaseConfiguration();
job = Job.getInstance(conf, "test123");
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
Scan scan = new Scan();
scan.addColumn(FAMILY_NAME, COLUMN_NAME);
scan.setTimeRange(MINSTAMP, MAXSTAMP);
scan.setMaxVersions();
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(),
scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job,
true, TableInputFormat.class);
job.waitForCompletion(true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (job != null) {
FileUtil.fullyDelete(
new File(job.getConfiguration().get("hadoop.tmp.dir")));
}
}
}
public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
Scan scan = new Scan();
if(cf != null) {
scan.addFamily(Bytes.toBytes(cf));
}
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
PrunerMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
sourceTable, // output table
null, // reducer class
job);
}
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat3.class);
// Set compression algorithms based on column families
configureCompression(conf, table.getTableDescriptor());
configureBloomType(table.getTableDescriptor(), conf);
configureBlockSize(table.getTableDescriptor(), conf);
HTableDescriptor tableDescriptor = table.getTableDescriptor();
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}
private Job doVerify(Configuration conf, TableDescriptor tableDescriptor, String... auths)
throws IOException, InterruptedException, ClassNotFoundException {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
Job job = new Job(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
setJobScannerConf(job);
Scan scan = new Scan();
scan.setAuthorizations(new Authorizations(auths));
TableMapReduceUtil.initTableMapperJob(tableDescriptor.getTableName().getNameAsString(), scan,
VerifyMapper.class, NullWritable.class, NullWritable.class, job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
TableMapReduceUtil.setScannerCaching(job, scannerCaching);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputDir);
assertTrue(job.waitForCompletion(true));
return job;
}
protected Job doLoad(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "load-output");
LOG.info("Load output dir: " + outputDir);
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
conf.set(TABLE_NAME_KEY, tableDescriptor.getTableName().getNameAsString());
Job job = Job.getInstance(conf);
job.setJobName(TEST_NAME + " Load for " + tableDescriptor.getTableName());
job.setJarByClass(this.getClass());
setMapperClass(job);
job.setInputFormatClass(NMapInputFormat.class);
job.setNumReduceTasks(0);
setJobScannerConf(job);
FileOutputFormat.setOutputPath(job, outputDir);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
TableMapReduceUtil.initCredentials(job);
assertTrue(job.waitForCompletion(true));
return job;
}
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
LOG.info("Verify output dir: " + outputDir);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
setJobScannerConf(job);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
BytesWritable.class, BytesWritable.class, job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
TableMapReduceUtil.setScannerCaching(job, scannerCaching);
job.setReducerClass(VerifyReducer.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
FileOutputFormat.setOutputPath(job, outputDir);
assertTrue(job.waitForCompletion(true));
long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
assertEquals(0, numOutputRecords);
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
String reportSeparatorString = (args.length > 2) ? args[2]: ":";
conf.set("ReportSeparator", reportSeparatorString);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CellCounter.class);
Scan scan = getConfiguredScanForJob(conf, args);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setReducerClass(IntSumReducer.class);
return job;
}
public void run() {
try {
Job job = Job.getInstance(HBaseContext.config, "UpdateClusterJob");
job.setJarByClass(UpdateClusterJob.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
Constants.hbase_cluster_model_table, scan,
HBaseReadMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob(
Constants.hbase_cluster_model_table,
HBaseWriteReducer.class, job);
job.setNumReduceTasks(4);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.job.user.classpath.first", true);
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setJarByClass(IndexScrutinyTool.class);
job.setOutputFormatClass(NullOutputFormat.class);
if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) {
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
}
job.setMapperClass((mapperClass == null ? IndexScrutinyMapper.class : mapperClass));
job.setNumReduceTasks(0);
// Set the Output classes
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.addDependencyJars(job);
return job;
}
private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
job.setReducerClass(PhoenixIndexImportDirectReducer.class);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
// Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
conf.set(TableOutputFormat.OUTPUT_TABLE,
PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
//Set the Output classes
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.setNumReduceTasks(1);
return job;
}
/**
* @param conf to use to create and run the job
* @param scan to be used to scan the raw table.
* @param totalJobCount the total number of jobs that need to be run in this
* batch. Used in job name.
* @return The job to be submitted to the cluster.
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount)
throws IOException {
Configuration confClone = new Configuration(conf);
// Turn off speculative execution.
// Note: must be BEFORE the job construction with the new mapreduce API.
confClone.setBoolean("mapred.map.tasks.speculative.execution", false);
// Set up job
Job job = new Job(confClone, getJobName(totalJobCount));
// This is a map-only class, skip reduce step
job.setNumReduceTasks(0);
job.setJarByClass(JobFileProcessor.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(Constants.HISTORY_RAW_TABLE, scan,
JobFileTableMapper.class, JobFileTableMapper.getOutputKeyClass(),
JobFileTableMapper.getOutputValueClass(), job);
return job;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
// 设置本地提交job,集群运行,需要代码
// File jarFile = EJob.createTempJar("target/classes");
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// 设置本地提交job,集群运行,需要代码结束
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置reducer配置
// 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS,
// null, job);
// 2. 本地运行,要求参数addDependencyJars为false
TableMapReduceUtil.initTableReducerJob(
EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null,
null, null, false);
job.setNumReduceTasks(0);
// 设置输入路径
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 初始化参数
this.processArgs(conf, args);
// 创建job
Job job = Job.getInstance(conf, "active_user");
// 设置job相关配置参数
job.setJarByClass(ActiveUserRunner.class);
// hbase 输入mapper参数
// 1. 本地运行
TableMapReduceUtil.initTableMapperJob(this.initScans(job), ActiveUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job, false);
// 2. 集群运行
// TableMapReduceUtil.initTableMapperJob(null, ActiveUserMapper.class,
// StatsUserDimension.class, TimeOutputValue.class, job);
// 设置reducer相关参数
job.setReducerClass(ActiveUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// 设置output相关参数
job.setOutputFormatClass(TransformerOutputFormat.class);
// 开始毫秒数
long startTime = System.currentTimeMillis();
try {
return job.waitForCompletion(true) ? 0 : -1;
} finally {
// 结束的毫秒数
long endTime = System.currentTimeMillis();
logger.info("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + "; 开始时间:" + startTime + "; 结束时间:" + endTime + "; 用时:" + (endTime - startTime) + "ms");
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 处理参数
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "new_install_user");
job.setJarByClass(NewInstallUserRunner.class);
// 本地运行
TableMapReduceUtil.initTableMapperJob(
initScans(job),
NewInstallUserMapper.class,
StatsUserDimension.class,
TimeOutputValue.class,
job,
false);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job), NewInstallUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job);
job.setReducerClass(NewInstallUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TransformerOutputFormat.class);
if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf);
return 0;
} else {
return -1;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
// 设置本地提交job,集群运行,需要代码
// File jarFile = EJob.createTempJar("target/classes");
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// 设置本地提交job,集群运行,需要代码结束
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置reducer配置
// 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
// 2. 本地运行,要求参数addDependencyJars为false
TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null, null, null, false);
job.setNumReduceTasks(0);
// 设置输入路径
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(TextSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
job.setNumReduceTasks(startKeys.size());
configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
/**
* Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
* waits for the job completion based on runForeground parameter.
*
* @param job job
* @param outputPath output path
* @param runForeground - if true, waits for job completion, else submits and returns
* immediately.
* @throws Exception
*/
private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars, boolean runForeground)
throws Exception {
job.setMapperClass(getDirectMapperClass());
job.setReducerClass(getDirectReducerClass());
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString());
//Set the Output classes
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
if (!skipDependencyJars) {
TableMapReduceUtil.addDependencyJars(job);
}
job.setNumReduceTasks(1);
if (!runForeground) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
return;
}
LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
boolean result = job.waitForCompletion(true);
if (!result) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
FileSystem.get(conf).delete(outputPath, true);
}
@Override
public void setLocation(String location, Job job) throws IOException {
final Configuration configuration = job.getConfiguration();
//explicitly turning off combining splits.
configuration.setBoolean("pig.noSplitCombination", true);
//to have phoenix working on a secured cluster
TableMapReduceUtil.initCredentials(job);
this.initializePhoenixPigConfiguration(location, configuration);
}
/**
* Sets up the actual job.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(String[] args) throws IOException {
Configuration conf = getConf();
String inputDirs = args[0];
String tabName = args[1];
conf.setStrings(TABLES_KEY, tabName);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job =
Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(MapReduceHFileSplitterJob.class);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
TableName tableName = TableName.valueOf(tabName);
job.setMapperClass(HFileCellMapper.class);
job.setReducerClass(CellSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
LOG.debug("success configuring load incremental job");
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
} else {
throw new IOException("No bulk output directory specified");
}
return job;
}
@Override
public int run(String[] args) throws Exception {
if (args.length > 0) {
System.err.println("Usage: hbase mapredcp [-Dtmpjars=...]");
System.err.println(" Construct a CLASSPATH containing dependency jars required to run a mapreduce");
System.err.println(" job. By default, includes any jars detected by TableMapReduceUtils. Provide");
System.err.println(" additional entries by specifying a comma-separated list in tmpjars.");
return 0;
}
TableMapReduceUtil.addHBaseDependencyJars(getConf());
System.out.println(TableMapReduceUtil.buildDependencyClasspath(getConf()));
return 0;
}
/**
* Execute compaction, using a Map-Reduce job.
*/
private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
final boolean compactOnce, final boolean major) throws Exception {
Configuration conf = getConf();
conf.setBoolean(CONF_COMPACT_ONCE, compactOnce);
conf.setBoolean(CONF_COMPACT_MAJOR, major);
Job job = new Job(conf);
job.setJobName("CompactionTool");
job.setJarByClass(CompactionTool.class);
job.setMapperClass(CompactionMapper.class);
job.setInputFormatClass(CompactionInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapSpeculativeExecution(false);
job.setNumReduceTasks(0);
// add dependencies (including HBase ones)
TableMapReduceUtil.addDependencyJars(job);
Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
FileSystem stagingFs = stagingDir.getFileSystem(conf);
try {
// Create input file with the store dirs
Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
inputPath, toCompactDirs);
CompactionInputFormat.addInputPath(job, inputPath);
// Initialize credential for secure cluster
TableMapReduceUtil.initCredentials(job);
// Despite the method name this will get delegation token for the filesystem
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
storeDirs.toArray(new Path[0]), conf);
// Start the MR Job and wait
return job.waitForCompletion(true) ? 0 : 1;
} finally {
fs.delete(stagingDir, true);
}
}
private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException,
ClassNotFoundException {
job = new Job(getConf());
job.setJobName("Link Verifier");
job.setNumReduceTasks(numReducers);
job.setJarByClass(getClass());
setJobScannerConf(job);
Scan scan = new Scan();
scan.addColumn(FAMILY_NAME, COLUMN_PREV);
scan.setCaching(10000);
scan.setCacheBlocks(false);
String[] split = labels.split(COMMA);
scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
split[(this.labelIndex * 2) + 1]));
TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
BytesWritable.class, BytesWritable.class, job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
job.setReducerClass(VerifyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputDir);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
Integer width, Integer wrapMultiplier, Integer numWalkers)
throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
+ ", numNodes=" + numNodes);
Job job = Job.getInstance(getConf());
job.setJobName("Random Input Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
job.setInputFormatClass(GeneratorInputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class);
setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
job.setMapperClass(Mapper.class); //identity mapper
FileOutputFormat.setOutputPath(job, tmpOutput);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
boolean success = jobCompletion(job);
return success ? 0 : 1;
}