下面列出了org.apache.hadoop.mapred.FileOutputFormat#setOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public int run(String[] args) throws Exception {
JobConf job = (JobConf) getConf();
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraValidate");
job.setJarByClass(TeraValidate.class);
job.setMapperClass(ValidateMapper.class);
job.setReducerClass(ValidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// force a single reducer
job.setNumReduceTasks(1);
// force a single split
job.setLong("mapred.min.split.size", Long.MAX_VALUE);
job.setInputFormat(TeraInputFormat.class);
JobClient.runJob(job);
return 0;
}
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(InvertedIndex.class);
conf.setJobName("InvertedIndex");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(conf, new Path("input"));
FileOutputFormat.setOutputPath(conf, new Path("output"));
conf.setMapperClass(InvertedIndexMapper.class);
conf.setReducerClass(InvertedIndexReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
/**
* Sets up a job conf for the given job using the given config object. Ensures
* that the correct input format is set, the mapper and and reducer class and
* the input and output keys and value classes along with any other job
* configuration.
*
* @param config
* @return JobConf representing the job to be ran
* @throws IOException
*/
private JobConf getJob(ConfigExtractor config) throws IOException {
JobConf job = new JobConf(config.getConfig(), SliveTest.class);
job.setInputFormat(DummyInputFormat.class);
FileOutputFormat.setOutputPath(job, config.getOutputPath());
job.setMapperClass(SliveMapper.class);
job.setPartitionerClass(SlivePartitioner.class);
job.setReducerClass(SliveReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, false);
job.setNumReduceTasks(config.getReducerAmount());
job.setNumMapTasks(config.getMapAmount());
return job;
}
/**
* @param args the cli arguments
*/
public int run(String[] args) throws IOException {
JobConf job = (JobConf) getConf();
setNumberOfRows(job, Long.parseLong(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class);
job.setMapperClass(SortGenMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(RangeInputFormat.class);
job.setOutputFormat(TeraOutputFormat.class);
JobClient.runJob(job);
return 0;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* The actual main() method for our program; this is the
* "driver" for the MapReduce job.
*/
public static void main(String[] args)
{
JobClient client = new JobClient();
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("LineIndexer");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(conf, new Path("input"));
FileOutputFormat.setOutputPath(conf, new Path("output"));
conf.setMapperClass(LineIndexMapper.class);
conf.setReducerClass(LineIndexReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
/**
* Runs the link analysis job. The link analysis job applies the link rank
* formula to create a score per url and stores that score in the NodeDb.
*
* Typically the link analysis job is run a number of times to allow the link
* rank scores to converge.
*
* @param nodeDb The node database from which we are getting previous link
* rank scores.
* @param inverted The inverted inlinks
* @param output The link analysis output.
* @param iteration The current iteration number.
* @param numIterations The total number of link analysis iterations
*
* @throws IOException If an error occurs during link analysis.
*/
private void runAnalysis(Path nodeDb, Path inverted, Path output,
int iteration, int numIterations, float rankOne)
throws IOException {
JobConf analyzer = new NutchJob(getConf());
analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ " of " + numIterations);
FileInputFormat.addInputPath(analyzer, nodeDb);
FileInputFormat.addInputPath(analyzer, inverted);
FileOutputFormat.setOutputPath(analyzer, output);
analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
analyzer.setMapOutputKeyClass(Text.class);
analyzer.setMapOutputValueClass(ObjectWritable.class);
analyzer.setInputFormat(SequenceFileInputFormat.class);
analyzer.setMapperClass(Analyzer.class);
analyzer.setReducerClass(Analyzer.class);
analyzer.setOutputKeyClass(Text.class);
analyzer.setOutputValueClass(Node.class);
analyzer.setOutputFormat(MapFileOutputFormat.class);
analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
LOG.info("Starting analysis job");
try {
JobClient.runJob(analyzer);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished analysis job.");
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("Busy Leg Count");
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
// Configure Mapper
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(SampleMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
// Configure Reducer
conf.setReducerClass(SampleReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(conf, outputPath);
JobClient.runJob(conf);
return 0;
}
/**
* Runs the counter job. The counter job determines the number of links in the
* webgraph. This is used during analysis.
*
* @param fs The job file system.
* @param webGraphDb The web graph database to use.
*
* @return The number of nodes in the web graph.
* @throws IOException If an error occurs while running the counter job.
*/
private int runCounter(FileSystem fs, Path webGraphDb)
throws IOException {
// configure the counter job
Path numLinksPath = new Path(webGraphDb, NUM_NODES);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
JobConf counter = new NutchJob(getConf());
counter.setJobName("LinkRank Counter");
FileInputFormat.addInputPath(counter, nodeDb);
FileOutputFormat.setOutputPath(counter, numLinksPath);
counter.setInputFormat(SequenceFileInputFormat.class);
counter.setMapperClass(Counter.class);
counter.setCombinerClass(Counter.class);
counter.setReducerClass(Counter.class);
counter.setMapOutputKeyClass(Text.class);
counter.setMapOutputValueClass(LongWritable.class);
counter.setOutputKeyClass(Text.class);
counter.setOutputValueClass(LongWritable.class);
counter.setNumReduceTasks(1);
counter.setOutputFormat(TextOutputFormat.class);
counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the counter job, outputs to a single reduce task and file
LOG.info("Starting link counter job");
try {
JobClient.runJob(counter);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished link counter job");
// read the first (and only) line from the file which should be the
// number of links in the web graph
LOG.info("Reading numlinks temp file");
FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
String numLinksLine = buffer.readLine();
readLinks.close();
// check if there are links to process, if none, webgraph might be empty
if (numLinksLine == null || numLinksLine.length() == 0) {
fs.delete(numLinksPath, true);
throw new IOException("No links to process, is the webgraph empty?");
}
// delete temp file and convert and return the number of links as an int
LOG.info("Deleting numlinks temp file");
fs.delete(numLinksPath, true);
String numLinks = numLinksLine.split("\\s+")[1];
return Integer.parseInt(numLinks);
}
public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
Path tempDir =
new Path(config.get("mapred.temp.dir", ".") +
"/readdb-topN-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("topN prepare " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbTopNMapper.class);
job.setReducerClass(IdentityReducer.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
// XXX hmmm, no setFloat() in the API ... :(
job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
JobClient.runJob(job);
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: collecting topN scores.");
}
job = new NutchJob(config);
job.setJobName("topN collect " + crawlDb);
job.setLong("db.reader.topn", topN);
FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(CrawlDbTopNReducer.class);
FileOutputFormat.setOutputPath(job, outFolder);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1); // create a single file.
JobClient.runJob(job);
FileSystem fs = FileSystem.get(config);
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }
}
public int run(String[] args) throws Exception {
// [email protected] -- why do we need this?
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("hdfsMapReduce");
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("VerifyHdfsData.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(HdfsDataMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(MyRow.class);
conf.setReducerClass(HdfsDataReducer.class);
conf.set(RowOutputFormat.OUTPUT_TABLE, "TRADE.HDFS_CUSTOMERS");
//conf.set(GfxdOutputFormat.OUTPUT_SCHEMA, "APP");
conf.set(RowOutputFormat.OUTPUT_URL, url);
conf.setOutputFormat(RowOutputFormat.class);
conf.setOutputKeyClass(Key.class);
conf.setOutputValueClass(DataObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
// not planning to use this, but I get an NPE without it
FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
JobClient.runJob(conf);
return 0;
}
/**
* This is the main routine for launching a distributed random write job.
* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
* The reduce doesn't do anything.
*
* @throws IOException
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: writer <out-dir>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Path outDir = new Path(args[0]);
JobConf job = new JobConf(getConf());
job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormat(RandomInputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
return -2;
}
long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
}
job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path("build/test/test.mapred.spill");
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(2);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set("map.output.key.field.separator", " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
/**archive the given source paths into
* the dest
* @param parentPath the parent path of all the source paths
* @param srcPaths the src paths to be archived
* @param dest the dest dir that will contain the archive
*/
void archive(Path parentPath, List<Path> srcPaths,
String archiveName, Path dest) throws IOException {
checkPaths(conf, srcPaths);
int numFiles = 0;
long totalSize = 0;
FileSystem fs = parentPath.getFileSystem(conf);
this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
conf.setLong(HAR_PARTSIZE_LABEL, partSize);
conf.set(DST_HAR_LABEL, archiveName);
conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
Path outputPath = new Path(dest, archiveName);
FileOutputFormat.setOutputPath(conf, outputPath);
FileSystem outFs = outputPath.getFileSystem(conf);
if (outFs.exists(outputPath) || outFs.isFile(dest)) {
throw new IOException("Invalid Output: " + outputPath);
}
conf.set(DST_DIR_LABEL, outputPath.toString());
final String randomId = DistCp.getRandomId();
Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
NAME + "_" + randomId);
conf.set(JOB_DIR_LABEL, jobDirectory.toString());
//get a tmp directory for input splits
FileSystem jobfs = jobDirectory.getFileSystem(conf);
jobfs.mkdirs(jobDirectory);
Path srcFiles = new Path(jobDirectory, "_har_src_files");
conf.set(SRC_LIST_LABEL, srcFiles.toString());
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
srcFiles, LongWritable.class, HarEntry.class,
SequenceFile.CompressionType.NONE);
// get the list of files
// create single list of files and dirs
try {
// write the top level dirs in first
writeTopLevelDirs(srcWriter, srcPaths, parentPath);
srcWriter.sync();
// these are the input paths passed
// from the command line
// we do a recursive ls on these paths
// and then write them to the input file
// one at a time
for (Path src: srcPaths) {
ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
recursivels(fs, src, allFiles);
for (FileStatus stat: allFiles) {
String toWrite = "";
long len = stat.isDir()? 0:stat.getLen();
String path = relPathToRoot(stat.getPath(), parentPath).toString();
String[] children = null;
if (stat.isDir()) {
//get the children
FileStatus[] list = fs.listStatus(stat.getPath());
children = new String[list.length];
for (int i = 0; i < list.length; i++) {
children[i] = list[i].getPath().getName();
}
}
srcWriter.append(new LongWritable(len), new HarEntry(path, children));
srcWriter.sync();
numFiles++;
totalSize += len;
}
}
} finally {
srcWriter.close();
}
//increase the replication of src files
jobfs.setReplication(srcFiles, (short) 10);
conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize);
//run atleast one map.
conf.setNumMapTasks(numMaps == 0? 1:numMaps);
conf.setNumReduceTasks(1);
conf.setInputFormat(HArchiveInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(HArchivesMapper.class);
conf.setReducerClass(HArchivesReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.set("hadoop.job.history.user.location", "none");
FileInputFormat.addInputPath(conf, jobDirectory);
//make sure no speculative execution is done
conf.setSpeculativeExecution(false);
JobClient.runJob(conf);
//delete the tmp job directory
try {
jobfs.delete(jobDirectory, true);
} catch(IOException ie) {
LOG.info("Unable to clean tmp directory " + jobDirectory);
}
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throws IOException {
//setup job conf
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobConf.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
final FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, jobConf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR, true);
}
}
/**
* Runs the process to dump the top urls out to a text file.
*
* @param webGraphDb The WebGraph from which to pull values.
*
* @param topN
* @param output
*
* @throws IOException If an error occurs while dumping the top values.
*/
public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, boolean asEff, NameType nameType, AggrType aggrType, boolean asSequenceFile)
throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("NodeDumper: starting at " + sdf.format(start));
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Configuration conf = getConf();
JobConf dumper = new NutchJob(conf);
dumper.setJobName("NodeDumper: " + webGraphDb);
FileInputFormat.addInputPath(dumper, nodeDb);
dumper.setInputFormat(SequenceFileInputFormat.class);
if (nameType == null) {
dumper.setMapperClass(Sorter.class);
dumper.setReducerClass(Sorter.class);
dumper.setMapOutputKeyClass(FloatWritable.class);
dumper.setMapOutputValueClass(Text.class);
} else {
dumper.setMapperClass(Dumper.class);
dumper.setReducerClass(Dumper.class);
dumper.setMapOutputKeyClass(Text.class);
dumper.setMapOutputValueClass(FloatWritable.class);
}
dumper.setOutputKeyClass(Text.class);
dumper.setOutputValueClass(FloatWritable.class);
FileOutputFormat.setOutputPath(dumper, output);
if (asSequenceFile) {
dumper.setOutputFormat(SequenceFileOutputFormat.class);
} else {
dumper.setOutputFormat(TextOutputFormat.class);
}
dumper.setNumReduceTasks(1);
dumper.setBoolean("inlinks", type == DumpType.INLINKS);
dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
dumper.setBoolean("scores", type == DumpType.SCORES);
dumper.setBoolean("host", nameType == NameType.HOST);
dumper.setBoolean("domain", nameType == NameType.DOMAIN);
dumper.setBoolean("sum", aggrType == AggrType.SUM);
dumper.setBoolean("max", aggrType == AggrType.MAX);
dumper.setLong("topn", topN);
// Set equals-sign as separator for Solr's ExternalFileField
if (asEff) {
dumper.set("mapred.textoutputformat.separator", "=");
}
try {
LOG.info("NodeDumper: running");
JobClient.runJob(dumper);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
long end = System.currentTimeMillis();
LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}