下面列出了怎么用org.apache.hadoop.mapred.lib.NullOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public JobConf setupJobConf(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) {
JobConf job = new JobConf(getConf(), SleepJob.class);
job.setNumMapTasks(numMapper);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepJob.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepJob.class);
job.setOutputFormat(NullOutputFormat.class);
job.setInputFormat(SleepInputFormat.class);
job.setPartitionerClass(SleepJob.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
job.setLong("sleep.job.map.sleep.time", mapSleepTime);
job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
job.setInt("sleep.job.map.sleep.count", mapSleepCount);
job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
return job;
}
public void configure(JobConf job) {
// Set the mapper and reducers
job.setMapperClass(ReadDataJob.TestMapper.class);
// Make sure this jar is included
job.setJarByClass(ReadDataJob.TestMapper.class);
// Specify the input and output data formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
// Turn off speculative execution
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// Add the job input path
FileInputFormat.addInputPath(job, new Path(this.input_path));
}
public void configure(JobConf job) {
// Set the mapper and reducers
job.setMapperClass(TestMapper.class);
// job.setReducerClass(TestReducer.class);
// Set the output types of the mapper and reducer
// job.setMapOutputKeyClass(IntWritable.class);
// job.setMapOutputValueClass(NullWritable.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(NullWritable.class);
// Make sure this jar is included
job.setJarByClass(TestMapper.class);
// Specify the input and output data formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
// Turn off speculative execution
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// Add the job input path
FileInputFormat.addInputPath(job, new Path(this.input_filename));
}
public void dedup(String solrUrl, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start));
LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
JobConf job = new NutchJob(getConf());
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setBoolean("noCommit", noCommit);
job.setInputFormat(SolrInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SolrRecord.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(SolrDeleteDuplicates.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
public void delete(String crawldb, String solrUrl, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrClean: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
job.setBoolean("noCommit", noCommit);
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(ByteWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DBFilter.class);
job.setReducerClass(SolrDeleter.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("SolrClean: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
Configuration conf = UTIL.getConfiguration();
final JobConf job = new JobConf(conf);
job.setInputFormat(clazz);
job.setOutputFormat(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
LOG.debug("submitting job.");
final RunningJob run = JobClient.runJob(job);
assertTrue("job failed!", run.isSuccessful());
assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
}
private static void runJvmReuseTest(JobConf job,
boolean reuse) throws IOException {
// setup a map-only job that reads the input and only sets the counters
// based on how many times the jvm was reused.
job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapperClass(ReuseDetector.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumMapTasks(24);
job.setNumReduceTasks(0);
RunningJob result = JobClient.runJob(job);
long uses = result.getCounters().findCounter("jvm", "use").getValue();
int maps = job.getNumMapTasks();
if (reuse) {
assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
} else {
assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
}
}
private static void runTest(String name, int keylen, int vallen,
int records, int ioSortMB, float recPer, float spillPer,
boolean pedantic) throws Exception {
JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
conf.setInt("io.sort.mb", ioSortMB);
conf.set("io.sort.record.percent", Float.toString(recPer));
conf.set("io.sort.spill.percent", Float.toString(spillPer));
conf.setInt("test.keywritable.length", keylen);
conf.setInt("test.valwritable.length", vallen);
conf.setInt("test.spillmap.records", records);
conf.setBoolean("test.pedantic.verification", pedantic);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(SpillMapper.class);
conf.setReducerClass(SpillReducer.class);
conf.setMapOutputKeyClass(KeyWritable.class);
conf.setMapOutputValueClass(ValWritable.class);
LOG.info("Running " + name);
JobClient.runJob(conf);
}
public JobConf setupJobConf(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) {
JobConf job = new JobConf(getConf(), SleepJob.class);
job.setNumMapTasks(numMapper);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepJob.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepJob.class);
job.setOutputFormat(NullOutputFormat.class);
job.setInputFormat(SleepInputFormat.class);
job.setPartitionerClass(SleepJob.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
job.setLong("sleep.job.map.sleep.time", mapSleepTime);
job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
job.setInt("sleep.job.map.sleep.count", mapSleepCount);
job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
return job;
}
public JobConf setupJobConf(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) {
JobConf job = new JobConf(getConf(), SleepJob.class);
job.setNumMapTasks(numMapper);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepJob.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepJob.class);
job.setOutputFormat(NullOutputFormat.class);
job.setInputFormat(SleepInputFormat.class);
job.setPartitionerClass(SleepJob.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
job.setLong("sleep.job.map.sleep.time", mapSleepTime);
job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
job.setInt("sleep.job.map.sleep.count", mapSleepCount);
job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
return job;
}
private static void runJvmReuseTest(JobConf job,
boolean reuse) throws IOException {
// setup a map-only job that reads the input and only sets the counters
// based on how many times the jvm was reused.
job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapperClass(ReuseDetector.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumMapTasks(24);
job.setNumReduceTasks(0);
RunningJob result = JobClient.runJob(job);
long uses = result.getCounters().findCounter("jvm", "use").getValue();
int maps = job.getNumMapTasks();
if (reuse) {
assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
} else {
assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
}
}
private static void runTest(String name, int keylen, int vallen,
int records, int ioSortMB, float recPer, float spillPer,
boolean pedantic) throws Exception {
JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
conf.setInt("io.sort.mb", ioSortMB);
conf.set("io.sort.record.percent", Float.toString(recPer));
conf.set("io.sort.spill.percent", Float.toString(spillPer));
conf.setInt("test.keywritable.length", keylen);
conf.setInt("test.valwritable.length", vallen);
conf.setInt("test.spillmap.records", records);
conf.setBoolean("test.pedantic.verification", pedantic);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(SpillMapper.class);
conf.setReducerClass(SpillReducer.class);
conf.setMapOutputKeyClass(KeyWritable.class);
conf.setMapOutputValueClass(ValWritable.class);
LOG.info("Running " + name);
JobClient.runJob(conf);
}
public JobConf setupJobConf(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
boolean doSpeculation, List<String> slowMaps,
List<String> slowReduces, int slowRatio,
int countersPerTask, List<String> hosts,
int hostsPerSplit, boolean setup) {
JobConf job = new JobConf(getConf(), SleepJob.class);
job.setNumMapTasks(numMapper);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepJob.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepJob.class);
job.setOutputFormat(NullOutputFormat.class);
job.setJobSetupCleanupNeeded(setup);
job.setInputFormat(SleepInputFormat.class);
job.setPartitionerClass(SleepJob.class);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
job.setLong("sleep.job.map.sleep.time", mapSleepTime);
job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
job.setInt("sleep.job.map.sleep.count", mapSleepCount);
job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
job.setSpeculativeExecution(doSpeculation);
job.setInt(SLOW_RATIO, slowRatio);
job.setStrings(SLOW_MAPS, slowMaps.toArray(new String[slowMaps.size()]));
job.setStrings(SLOW_REDUCES, slowMaps.toArray(new String[slowReduces.size()]));
job.setInt("sleep.job.counters.per.task", countersPerTask);
job.setStrings(HOSTS_FOR_LOCALITY, hosts.toArray(new String[hosts.size()]));
job.setInt(HOSTS_PER_SPLIT, hostsPerSplit);
return job;
}
private void runTest(String name, int keyLen, int valLen,
int bigKeyLen, int bigValLen, int recordsNumPerMapper,
int sortMb, float spillPer, int numMapperTasks,
int numReducerTask, double[] reducerRecPercents,
int[] numBigRecordsStart, int[] numBigRecordsMiddle,
int[] numBigRecordsEnd) throws Exception {
JobConf conf = mrCluster.createJobConf();
conf.setInt("io.sort.mb", sortMb);
conf.set("io.sort.spill.percent", Float.toString(spillPer));
conf.setInt("test.key.length", keyLen);
conf.setInt("test.value.length", valLen);
conf.setInt("test.bigkey.length", bigKeyLen);
conf.setInt("test.bigvalue.length", bigValLen);
conf.setNumMapTasks(numMapperTasks);
conf.setNumReduceTasks(numReducerTask);
conf.setInputFormat(FakeIF.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(TestNewCollectorMapper.class);
conf.setReducerClass(TestNewCollectorReducer.class);
conf.setMapOutputKeyClass(TestNewCollectorKey.class);
conf.setMapOutputValueClass(BytesWritable.class);
conf.setBoolean("mapred.map.output.blockcollector", true);
RecordNumStore.setJobConf(numReducerTask, numMapperTasks,
recordsNumPerMapper, reducerRecPercents, numBigRecordsStart,
numBigRecordsMiddle, numBigRecordsEnd, conf);
RecordNumStore.getInst(conf);
LOG.info("Running " + name);
JobClient.runJob(conf);
}
private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
int numMapper, int numReducer)
throws IOException {
setConf(clusterConf);
initialize();
JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
conf.setJobName("ControlledJob");
conf.set("signal.dir.path", signalFileDir.toString());
conf.setNumMapTasks(numMapper);
conf.setNumReduceTasks(numReducer);
conf.setMapperClass(ControlledMapReduceJob.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(NullWritable.class);
conf.setReducerClass(ControlledMapReduceJob.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(NullWritable.class);
conf.setInputFormat(ControlledMapReduceJob.class);
FileInputFormat.addInputPath(conf, new Path("ignored"));
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
// Set the following for reduce tasks to be able to be started running
// immediately along with maps.
conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
return conf;
}
public void delete(String crawldb, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("CleaningJob: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
FileInputFormat.addInputPath(job, new Path(crawldb,
CrawlDb.CURRENT_NAME));
job.setBoolean("noCommit", noCommit);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(ByteWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DBFilter.class);
job.setReducerClass(DeleterReducer.class);
job.setJobName("CleaningJob");
// need to expicitely allow deletions
job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
int numMapper, int numReducer)
throws IOException {
setConf(clusterConf);
initialize();
JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
conf.setJobName("ControlledJob");
conf.set("signal.dir.path", signalFileDir.toString());
conf.setNumMapTasks(numMapper);
conf.setNumReduceTasks(numReducer);
conf.setMapperClass(ControlledMapReduceJob.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(NullWritable.class);
conf.setReducerClass(ControlledMapReduceJob.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(NullWritable.class);
conf.setInputFormat(ControlledMapReduceJob.class);
FileInputFormat.addInputPath(conf, new Path("ignored"));
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
// Set the following for reduce tasks to be able to be started running
// immediately along with maps.
conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
return conf;
}
public int run(String [] argv) throws Exception {
JobConf job = new JobConf(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return -1;
}
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormat(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != job.getClass(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
Random r = new Random();
Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
tmpDir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(job);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDirectory()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()),
new Text(stat.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
// Save the user's partitioner and hook in our's.
setJavaPartitioner(conf, conf.getPartitionerClass());
conf.setPartitionerClass(PipesPartitioner.class);
}
if (!getIsJavaReducer(conf)) {
conf.setReducerClass(PipesReducer.class);
if (!getIsJavaRecordWriter(conf)) {
conf.setOutputFormat(NullOutputFormat.class);
}
}
String textClassname = Text.class.getName();
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
// Use PipesNonJavaInputFormat if necessary to handle progress reporting
// from C++ RecordReaders ...
if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
conf.setClass(Submitter.INPUT_FORMAT,
conf.getInputFormat().getClass(), InputFormat.class);
conf.setInputFormat(PipesNonJavaInputFormat.class);
}
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
}
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
}
URI[] fileCache = DistributedCache.getCacheFiles(conf);
if (fileCache == null) {
fileCache = new URI[1];
} else {
URI[] tmp = new URI[fileCache.length+1];
System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
fileCache = tmp;
}
try {
fileCache[0] = new URI(exec);
} catch (URISyntaxException e) {
IOException ie = new IOException("Problem parsing execable URI " + exec);
ie.initCause(e);
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
}
public int run(String [] argv) throws Exception {
JobConf job = new JobConf(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return -1;
}
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormat(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != job.getClass(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
Random r = new Random();
Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
tmpDir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(job);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDirectory()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()),
new Text(stat.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
// Save the user's partitioner and hook in our's.
setJavaPartitioner(conf, conf.getPartitionerClass());
conf.setPartitionerClass(PipesPartitioner.class);
}
if (!getIsJavaReducer(conf)) {
conf.setReducerClass(PipesReducer.class);
if (!getIsJavaRecordWriter(conf)) {
conf.setOutputFormat(NullOutputFormat.class);
}
}
String textClassname = Text.class.getName();
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
// Use PipesNonJavaInputFormat if necessary to handle progress reporting
// from C++ RecordReaders ...
if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
conf.setClass(Submitter.INPUT_FORMAT,
conf.getInputFormat().getClass(), InputFormat.class);
conf.setInputFormat(PipesNonJavaInputFormat.class);
}
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
}
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
}
URI[] fileCache = DistributedCache.getCacheFiles(conf);
if (fileCache == null) {
fileCache = new URI[1];
} else {
URI[] tmp = new URI[fileCache.length+1];
System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
fileCache = tmp;
}
try {
fileCache[0] = new URI(exec);
} catch (URISyntaxException e) {
IOException ie = new IOException("Problem parsing execable URI " + exec);
ie.initCause(e);
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
}
@Override
public String getOutputFormat() {
return NullOutputFormat.class.getName();
}
@Override
public String getOutputFormat() {
return NullOutputFormat.class.getName();
}
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception {
//create the table and snapshot
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
if (shutdownCluster) {
util.shutdownMiniHBaseCluster();
}
try {
// create the job
JobConf jobConf = new JobConf(util.getConfiguration());
jobConf.setJarByClass(util.getClass());
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
TestTableSnapshotInputFormat.class);
if(numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, jobConf, true, tableDir);
}
jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
jobConf.setNumReduceTasks(1);
jobConf.setOutputFormat(NullOutputFormat.class);
RunningJob job = JobClient.runJob(jobConf);
Assert.assertTrue(job.isSuccessful());
} finally {
if (!shutdownCluster) {
util.getAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
}
}
}
private static void setupPipesJob(JobConf conf) throws IOException {
// default map output types to Text
if (!getIsJavaMapper(conf)) {
conf.setMapRunnerClass(PipesMapRunner.class);
// Save the user's partitioner and hook in our's.
setJavaPartitioner(conf, conf.getPartitionerClass());
conf.setPartitionerClass(PipesPartitioner.class);
}
if (!getIsJavaReducer(conf)) {
conf.setReducerClass(PipesReducer.class);
if (!getIsJavaRecordWriter(conf)) {
conf.setOutputFormat(NullOutputFormat.class);
}
}
String textClassname = Text.class.getName();
setIfUnset(conf, "mapred.mapoutput.key.class", textClassname);
setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
setIfUnset(conf, "mapred.output.key.class", textClassname);
setIfUnset(conf, "mapred.output.value.class", textClassname);
// Use PipesNonJavaInputFormat if necessary to handle progress reporting
// from C++ RecordReaders ...
if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
conf.setClass("mapred.pipes.user.inputformat",
conf.getInputFormat().getClass(), InputFormat.class);
conf.setInputFormat(PipesNonJavaInputFormat.class);
}
String exec = getExecutable(conf);
if (exec == null) {
throw new IllegalArgumentException("No application program defined.");
}
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
DistributedCache.createSymlink(conf);
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf,"mapred.map.task.debug.script",defScript);
setIfUnset(conf,"mapred.reduce.task.debug.script",defScript);
}
URI[] fileCache = DistributedCache.getCacheFiles(conf);
if (fileCache == null) {
fileCache = new URI[1];
} else {
URI[] tmp = new URI[fileCache.length+1];
System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
fileCache = tmp;
}
try {
fileCache[0] = new URI(exec);
} catch (URISyntaxException e) {
IOException ie = new IOException("Problem parsing execable URI " + exec);
ie.initCause(e);
throw ie;
}
DistributedCache.setCacheFiles(fileCache, conf);
}
public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
boolean outputCompressed) throws Exception {
JobConf job = new JobConf();
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return null;
}
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormat(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != job.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir, Integer.toString(r
.nextInt(Integer.MAX_VALUE), 36)
+ "_files");
job.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
.getFileSystem(job), job, indirInputFile, LongWritable.class,
Text.class, SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(job);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDir()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()), new Text(stat
.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
job.setCompressMapOutput(mapoutputCompressed);
job.setBoolean("mapred.output.compress", outputCompressed);
return job;
}
/**archive the given source paths into
* the dest
* @param parentPath the parent path of all the source paths
* @param srcPaths the src paths to be archived
* @param dest the dest dir that will contain the archive
*/
void archive(Path parentPath, List<Path> srcPaths,
String archiveName, Path dest) throws IOException {
checkPaths(conf, srcPaths);
int numFiles = 0;
long totalSize = 0;
FileSystem fs = parentPath.getFileSystem(conf);
this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
conf.setLong(HAR_PARTSIZE_LABEL, partSize);
conf.set(DST_HAR_LABEL, archiveName);
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
throw new IOException("Invalid Output: " + outputPath);
}
conf.set(DST_DIR_LABEL, outputPath.toString());
final String randomId = DistCp.getRandomId();
Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
NAME + "_" + randomId);
conf.set(JOB_DIR_LABEL, jobDirectory.toString());
//get a tmp directory for input splits
FileSystem jobfs = jobDirectory.getFileSystem(conf);
jobfs.mkdirs(jobDirectory);
Path srcFiles = new Path(jobDirectory, "_har_src_files");
conf.set(SRC_LIST_LABEL, srcFiles.toString());
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
srcFiles, LongWritable.class, HarEntry.class,
SequenceFile.CompressionType.NONE);
// get the list of files
// create single list of files and dirs
try {
// write the top level dirs in first
writeTopLevelDirs(srcWriter, srcPaths, parentPath);
srcWriter.sync();
// these are the input paths passed
// from the command line
// we do a recursive ls on these paths
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
recursivels(fs, src, allFiles);
for (FileStatus stat: allFiles) {
String toWrite = "";
long len = stat.isDir()? 0:stat.getLen();
String path = relPathToRoot(stat.getPath(), parentPath).toString();
String[] children = null;
if (stat.isDir()) {
//get the children
FileStatus[] list = fs.listStatus(stat.getPath());
children = new String[list.length];
for (int i = 0; i < list.length; i++) {
children[i] = list[i].getPath().getName();
}
}
srcWriter.append(new LongWritable(len), new HarEntry(path, children));
srcWriter.sync();
numFiles++;
totalSize += len;
}
}
} finally {
srcWriter.close();
}
//increase the replication of src files
jobfs.setReplication(srcFiles, (short) 10);
conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize);
//run atleast one map.
conf.setNumMapTasks(numMaps == 0? 1:numMaps);
conf.setNumReduceTasks(1);
conf.setInputFormat(HArchiveInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(HArchivesMapper.class);
conf.setReducerClass(HArchivesReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.set("hadoop.job.history.user.location", "none");
FileInputFormat.addInputPath(conf, jobDirectory);
//make sure no speculative execution is done
conf.setSpeculativeExecution(false);
JobClient.runJob(conf);
//delete the tmp job directory
try {
jobfs.delete(jobDirectory, true);
} catch(IOException ie) {
LOG.info("Unable to clean tmp directory " + jobDirectory);
}
}
public int run(String [] argv) throws Exception {
JobConf job = new JobConf(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return -1;
}
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormat(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != job.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
sysdir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(job);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDir()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()),
new Text(stat.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
boolean outputCompressed) throws Exception {
JobConf job = new JobConf();
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return null;
}
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormat(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != job.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir, Integer.toString(r
.nextInt(Integer.MAX_VALUE), 36)
+ "_files");
job.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
.getFileSystem(job), job, indirInputFile, LongWritable.class,
Text.class, SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(job);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDir()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()), new Text(stat
.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
job.setCompressMapOutput(mapoutputCompressed);
job.setBoolean("mapred.output.compress", outputCompressed);
return job;
}
/**archive the given source paths into
* the dest
* @param srcPaths the src paths to be archived
* @param dest the dest dir that will contain the archive
*/
public void archive(List<Path> srcPaths, String archiveName, Path dest)
throws IOException {
checkPaths(conf, srcPaths);
int numFiles = 0;
long totalSize = 0;
conf.set(DST_HAR_LABEL, archiveName);
Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
throw new IOException("Invalid Output.");
}
conf.set(DST_DIR_LABEL, outputPath.toString());
final String randomId = DistCp.getRandomId();
Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
NAME + "_" + randomId);
conf.set(JOB_DIR_LABEL, jobDirectory.toString());
//get a tmp directory for input splits
FileSystem jobfs = jobDirectory.getFileSystem(conf);
jobfs.mkdirs(jobDirectory);
Path srcFiles = new Path(jobDirectory, "_har_src_files");
conf.set(SRC_LIST_LABEL, srcFiles.toString());
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
srcFiles, LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
// get the list of files
// create single list of files and dirs
try {
// write the top level dirs in first
writeTopLevelDirs(srcWriter, srcPaths);
srcWriter.sync();
// these are the input paths passed
// from the command line
// we do a recursive ls on these paths
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
FileSystem fs = src.getFileSystem(conf);
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
recursivels(fs, src, allFiles);
for (FileStatus stat: allFiles) {
String toWrite = "";
long len = stat.isDir()? 0:stat.getLen();
if (stat.isDir()) {
toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
//get the children
FileStatus[] list = fs.listStatus(stat.getPath());
StringBuffer sbuff = new StringBuffer();
sbuff.append(toWrite);
for (FileStatus stats: list) {
sbuff.append(stats.getPath().getName() + " ");
}
toWrite = sbuff.toString();
}
else {
toWrite += fs.makeQualified(stat.getPath()) + " file ";
}
srcWriter.append(new LongWritable(len), new
Text(toWrite));
srcWriter.sync();
numFiles++;
totalSize += len;
}
}
} finally {
srcWriter.close();
}
//increase the replication of src files
jobfs.setReplication(srcFiles, (short) 10);
conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize);
//run atleast one map.
conf.setNumMapTasks(numMaps == 0? 1:numMaps);
conf.setNumReduceTasks(1);
conf.setInputFormat(HArchiveInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(HArchivesMapper.class);
conf.setReducerClass(HArchivesReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.set("hadoop.job.history.user.location", "none");
FileInputFormat.addInputPath(conf, jobDirectory);
//make sure no speculative execution is done
conf.setSpeculativeExecution(false);
JobClient.runJob(conf);
//delete the tmp job directory
try {
jobfs.delete(jobDirectory, true);
} catch(IOException ie) {
LOG.info("Unable to clean tmp directory " + jobDirectory);
}
}