下面列出了org.apache.hadoop.mapred.JobConf#setReducerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void run(String[] args) throws Exception
{
JobConf conf = new JobConf(this.getClass());
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
/**
* Sets up a job conf for the given job using the given config object. Ensures
* that the correct input format is set, the mapper and and reducer class and
* the input and output keys and value classes along with any other job
* configuration.
*
* @param config
* @return JobConf representing the job to be ran
* @throws IOException
*/
private JobConf getJob(ConfigExtractor config) throws IOException {
JobConf job = new JobConf(config.getConfig(), SliveTest.class);
job.setInputFormat(DummyInputFormat.class);
FileOutputFormat.setOutputPath(job, config.getOutputPath());
job.setMapperClass(SliveMapper.class);
job.setPartitionerClass(SlivePartitioner.class);
job.setReducerClass(SliveReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, false);
job.setNumReduceTasks(config.getReducerAmount());
job.setNumMapTasks(config.getMapAmount());
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-kill");
conf.setMapperClass(KillMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (job.getJobState() != JobStatus.RUNNING) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
job.killJob();
while (job.cleanupProgress() == 0.0f) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
break;
}
}
return job;
}
static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
public void dedup(String solrUrl, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start));
LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
JobConf job = new NutchJob(getConf());
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setBoolean("noCommit", noCommit);
job.setInputFormat(SolrInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SolrRecord.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(SolrDeleteDuplicates.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
public int run(final String[] args) throws Exception {
log.info("run starting");
final Configuration conf = getConf();
JobConf job = new JobConf(conf, WordCountOutput.class);
job.setJobName("AerospikeWordCountOutput");
for (int ii = 0; ii < args.length; ++ii) {
FileInputFormat.addInputPath(job, new Path(args[ii]));
}
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormat(MyOutputFormat.class);
JobClient.runJob(job);
log.info("finished");
return 0;
}
public static void runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException, InterruptedException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setJarByClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setMaxMapAttempts(1);
boolean success = runJob(conf, inDir, outDir, 1, 0);
Assert.assertFalse("Job expected to fail succeeded", success);
}
public void delete(String crawldb, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("CleaningJob: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
FileInputFormat.addInputPath(job, new Path(crawldb,
CrawlDb.CURRENT_NAME));
job.setBoolean("noCommit", noCommit);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(ByteWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DBFilter.class);
job.setReducerClass(DeleterReducer.class);
job.setJobName("CleaningJob");
// need to expicitely allow deletions
job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
public static void filter(String alignpath,
String outpath,
int nummappers,
int numreducers) throws IOException, Exception
{
System.out.println("NUM_FMAP_TASKS: " + nummappers);
System.out.println("NUM_FREDUCE_TASKS: " + numreducers);
JobConf conf = new JobConf(FilterAlignments.class);
conf.setJobName("FilterAlignments");
conf.setNumMapTasks(nummappers);
conf.setNumReduceTasks(numreducers);
FileInputFormat.addInputPath(conf, new Path(alignpath));
conf.setMapperClass(FilterMapClass.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(BytesWritable.class);
conf.setCombinerClass(FilterCombinerClass.class);
conf.setReducerClass(FilterReduceClass.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(BytesWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
Path oPath = new Path(outpath);
FileOutputFormat.setOutputPath(conf, oPath);
System.err.println(" Removing old results");
FileSystem.get(conf).delete(oPath);
JobClient.runJob(conf);
System.err.println("FilterAlignments Finished");
}
public int run(String[] args) throws Exception
{
// Create a configuration
Configuration conf = getConf();
// Create a job from the default configuration that will use the WordCount class
JobConf job = new JobConf(conf, LogCountsPerHour.class);
// Define our input path as the first command line argument and our output path as the second
Path in = new Path(args[0]);
Path out = new Path(args[1]);
// Create File Input/Output formats for these paths (in the job)
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
// Configure the job: name, mapper, reducer, and combiner
job.setJobName("LogAveragePerHour");
job.setMapperClass(LogMapClass.class);
job.setReducerClass(LogReduce.class);
job.setCombinerClass(LogReduce.class);
// Configure the output
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(DateWritable.class);
job.setOutputValueClass(IntWritable.class);
// Run the job
JobClient.runJob(job);
return 0;
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("TradeSecurityHdfsDataVerifier");
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("TradeSecurityHdfsDataVerifier.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(HdfsDataMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(TradeSecurityRow.class);
conf.setReducerClass(HdfsDataReducer.class);
conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
conf.set(RowOutputFormat.OUTPUT_URL, url);
conf.setOutputFormat(RowOutputFormat.class);
conf.setOutputKeyClass(Key.class);
conf.setOutputValueClass(TradeSecurityOutputObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
JobClient.runJob(conf);
return 0;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("Busy Airport Count");
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(SampleMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setReducerClass(SampleReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(conf, intermediateOutputPath);
int rc = JobClient.runJob(conf).isSuccessful() ? 0 : 1;
if (rc == 0) {
JobConf topConf = new JobConf(getConf());
topConf.setJobName("Top Busy Airport");
// Only run a single reducer
topConf.setNumReduceTasks(1);
FileInputFormat.setInputPaths(topConf, intermediateOutputPath);
topConf.setInputFormat(TextInputFormat.class);
topConf.setMapperClass(TopBusyAirportMapper.class);
topConf.setMapOutputKeyClass(Text.class);
topConf.setMapOutputValueClass(StringIntPair.class);
topConf.setReducerClass(TopBusyAirportReducer.class);
topConf.setOutputKeyClass(Text.class);
topConf.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(topConf, outputPath);
rc = JobClient.runJob(topConf).isSuccessful() ? 0 : 1;
}
return rc;
}
public int run(String[] args) throws Exception {
// Get current configuration.
Configuration conf = getConf();
// Parse command line arguments.
String inputPath = args[0];
String outputPath = args[1];
String maxArcFiles = "";
if (args.length == 3)
maxArcFiles = args[2];
// Set the maximum number of arc files to process.
conf.set(MAX_FILES_KEY, maxArcFiles);
JobConf job = new JobConf(conf);
// Set input path.
if (inputPath.length() > 0) {
LOG.info("Setting input path to " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileInputFormat.setInputPathFilter(job, FileCountFilter.class);
} else {
System.err.println("No input path found.");
return 1;
}
// Set output path.
if (outputPath.length() > 0) {
LOG.info("Setting output path to " + outputPath);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
// Compress output to boost performance.
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
} else {
System.err.println("No output path found.");
return 1;
}
// Load other classes from same jar a this class.
job.setJarByClass(WikiReverse.class);
// Input is in WARC file format.
job.setInputFormat(WarcFileInputFormat.class);
// Output is Hadoop sequence file format.
job.setOutputFormat(SequenceFileOutputFormat.class);
// Set the output data types.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LinkArrayWritable.class);
// Use custom mapper class.
job.setMapRunnerClass(WikiReverseMapper.class);
// Use custom reducer class.
job.setReducerClass(LinkArrayReducer.class);
// Allow 5 percent of map tasks to fail.
job.setMaxMapTaskFailuresPercent(MAX_MAP_TASK_FAILURES_PERCENT);
if (JobClient.runJob(job).isSuccessful())
return 0;
else
return 1;
}
/**
* Updates the inlink score in the web graph node databsae into the crawl
* database.
*
* @param crawlDb The crawl database to update
* @param webGraphDb The webgraph database to use.
*
* @throws IOException If an error occurs while updating the scores.
*/
public void update(Path crawlDb, Path webGraphDb)
throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("ScoreUpdater: starting at " + sdf.format(start));
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
// create a temporary crawldb with the new scores
LOG.info("Running crawldb update " + crawlDb);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
Path newCrawlDb = new Path(crawlDb,
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// run the updater job outputting to the temp crawl database
JobConf updater = new NutchJob(conf);
updater.setJobName("Update CrawlDb from WebGraph");
FileInputFormat.addInputPath(updater, crawlDbCurrent);
FileInputFormat.addInputPath(updater, nodeDb);
FileOutputFormat.setOutputPath(updater, newCrawlDb);
updater.setInputFormat(SequenceFileInputFormat.class);
updater.setMapperClass(ScoreUpdater.class);
updater.setReducerClass(ScoreUpdater.class);
updater.setMapOutputKeyClass(Text.class);
updater.setMapOutputValueClass(ObjectWritable.class);
updater.setOutputKeyClass(Text.class);
updater.setOutputValueClass(CrawlDatum.class);
updater.setOutputFormat(MapFileOutputFormat.class);
try {
JobClient.runJob(updater);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
// remove the temp crawldb on error
if (fs.exists(newCrawlDb)) {
fs.delete(newCrawlDb, true);
}
throw e;
}
// install the temp crawl database
LOG.info("ScoreUpdater: installing new crawldb " + crawlDb);
CrawlDb.install(updater, crawlDb);
long end = System.currentTimeMillis();
LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir));
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}
public int run(String[] args) throws Exception {
// [email protected] -- why do we need this?
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("hdfsMapReduce");
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("VerifyHdfsData.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(HdfsDataMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(MyRow.class);
conf.setReducerClass(HdfsDataReducer.class);
conf.set(RowOutputFormat.OUTPUT_TABLE, "TRADE.HDFS_CUSTOMERS");
//conf.set(GfxdOutputFormat.OUTPUT_SCHEMA, "APP");
conf.set(RowOutputFormat.OUTPUT_URL, url);
conf.setOutputFormat(RowOutputFormat.class);
conf.setOutputKeyClass(Key.class);
conf.setOutputValueClass(DataObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
// not planning to use this, but I get an NPE without it
FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
JobClient.runJob(conf);
return 0;
}