下面列出了org.apache.hadoop.mapred.JobConf#setOutputValueClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Test(timeout = 5000)
public void testOldAPI_SequenceFileOutputFormat() throws Exception {
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(false, output.useNewApi);
assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
private void updateJobConf(JobConf conf, Path inputPath, Path outputPath) {
// set specific job config
conf.setLong(NUMBER_OF_MAPS_KEY, nmaps);
conf.setLong(NUMBER_OF_THREADS_KEY, nthreads);
conf.setInt(BUFFER_SIZE_KEY, buffersize);
conf.setLong(WRITER_DATARATE_KEY, datarate);
conf.setLong("mapred.task.timeout", Long.MAX_VALUE);
conf.set(OUTPUT_DIR_KEY, output);
// set the output and input for the map reduce
FileInputFormat.setInputPaths(conf, inputPath);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Test(timeout = 5000)
public void testNewAPI_SequenceFileOutputFormat() throws Exception {
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, SequenceFileOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(true, output.useNewApi);
assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
printUsage();
return 1;
}
JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
job.setJobName("MultiFileWordCount");
//set the InputFormat of the job to our InputFormat
job.setInputFormat(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(LongWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
return 0;
}
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return job;
}
public int run(String[] args) throws Exception {
LOG.info("starting");
JobConf job = (JobConf) getConf();
Path inputDir = new Path(args[0]);
inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(TeraInputFormat.class);
job.setOutputFormat(TeraOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
TeraInputFormat.writePartitionFile(job, partitionFile);
DistributedCache.addCacheFile(partitionUri, job);
DistributedCache.createSymlink(job);
job.setInt("dfs.replication", 1);
TeraOutputFormat.setFinalSync(job, true);
long startTime = System.currentTimeMillis();
JobClient.runJob(job);
long endTime = System.currentTimeMillis();
System.out.println((float)(endTime-startTime)/1000);
LOG.info("done");
return 0;
}
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status, Integer retry) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
}
else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null) job.set("status", status);
if (regex != null) job.set("regex", regex);
if (retry != null) job.setInt("retry", retry);
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 4) {
printUsage();
}
Path userPath = new Path(args[0]);
Path commentPath = new Path(args[1]);
Path outputDir = new Path(args[2]);
String joinType = args[3];
JobConf conf = new JobConf("CompositeJoin");
conf.setJarByClass(CompositeUserJoin.class);
conf.setMapperClass(CompositeMapper.class);
conf.setNumReduceTasks(0);
// Set the input format class to a CompositeInputFormat class.
// The CompositeInputFormat will parse all of our input files and output
// records to our mapper.
conf.setInputFormat(CompositeInputFormat.class);
// The composite input format join expression will set how the records
// are going to be read in, and in what input format.
conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
KeyValueTextInputFormat.class, userPath, commentPath));
TextOutputFormat.setOutputPath(conf, outputDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
return job.isSuccessful() ? 0 : 1;
}
private static JobConf createJobConf(Configuration conf) {
JobConf jobconf = new JobConf(conf, DistCh.class);
jobconf.setJobName(NAME);
jobconf.setMapSpeculativeExecution(false);
jobconf.setInputFormat(ChangeInputFormat.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(Text.class);
jobconf.setMapperClass(ChangeFilesMapper.class);
jobconf.setNumReduceTasks(0);
return jobconf;
}
@SuppressWarnings("deprecation")
public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, StratosphereTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
super(new HadoopOutputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),input, name);
Preconditions.checkNotNull(hadoopFormat);
Preconditions.checkNotNull(jobConf);
this.name = name;
this.jobConf = jobConf;
jobConf.setOutputKeyClass(keyClass);
jobConf.setOutputValueClass(valueClass);
}
public int run(String[] args) throws Exception {
// Get current configuration.
Configuration conf = getConf();
// Parse command line arguments.
String inputPaths = args[0];
String outputPath = args[1];
JobConf job = new JobConf(conf);
// Set input path.
if (inputPaths.length() > 0) {
List<String> segmentPaths = Lists.newArrayList(Splitter.on(",")
.split(inputPaths));
for (String segmentPath : segmentPaths) {
LOG.info("Adding input path " + segmentPath);
FileInputFormat.addInputPath(job, new Path(segmentPath));
}
} else {
System.err.println("No input path found.");
return 1;
}
// Set output path.
if (outputPath.length() > 0) {
LOG.info("Setting output path to " + outputPath);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
// Compress output to boost performance.
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
} else {
System.err.println("No output path found.");
return 1;
}
// Load other classes from same jar as this class.
job.setJarByClass(SegmentCombiner.class);
// Input is Hadoop sequence file format.
job.setInputFormat(SequenceFileInputFormat.class);
// Output is Hadoop sequence file format.
job.setOutputFormat(SequenceFileOutputFormat.class);
// Set the output data types.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LinkArrayWritable.class);
// Use custom mapper class.
job.setMapperClass(SegmentCombinerMapper.class);
// Use custom reducer class.
job.setReducerClass(LinkArrayReducer.class);
if (JobClient.runJob(job).isSuccessful())
return 0;
else
return 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
JobConf conf = new JobConf(getConf());
conf.setJobName("TradesHdfsDataVerifier");
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("TradesHdfsDataVerifier.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.setInputFormat(RowInputFormat.class);
conf.setMapperClass(HdfsDataMapper.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(TradesRow.class);
conf.setReducerClass(HdfsDataReducer.class);
conf.set(RowOutputFormat.OUTPUT_TABLE, tableName + "_HDFS");
//conf.set(GfxdOutputFormat.OUTPUT_SCHEMA, "APP");
conf.set(RowOutputFormat.OUTPUT_URL, url);
conf.setOutputFormat(RowOutputFormat.class);
conf.setOutputKeyClass(Key.class);
conf.setOutputValueClass(TradeOutputObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
JobClient.runJob(conf);
return 0;
}
/**
* Based on args we submit the LoadGenerator as MR job.
* Number of MapTasks is numMapTasks
* @return exitCode for job submission
*/
private int submitAsMapReduce() {
System.out.println("Running as a MapReduce job with " +
numMapTasks + " mapTasks; Output to file " + mrOutDir);
Configuration conf = new Configuration(getConf());
// First set all the args of LoadGenerator as Conf vars to pass to MR tasks
conf.set(LG_ROOT , root.toString());
conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
conf.setInt(LG_NUMOFTHREADS, numOfThreads);
conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
conf.setLong(LG_SEED, seed); //No idea what this is
conf.setInt(LG_NUMMAPTASKS, numMapTasks);
if (scriptFile == null && durations[0] <=0) {
System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
System.exit(-1);
}
conf.setLong(LG_ELAPSEDTIME, durations[0]);
conf.setLong(LG_STARTTIME, startTime);
if (scriptFile != null) {
conf.set(LG_SCRIPTFILE , scriptFile);
}
conf.set(LG_FLAGFILE, flagFile.toString());
// Now set the necessary conf variables that apply to run MR itself.
JobConf jobConf = new JobConf(conf, LoadGenerator.class);
jobConf.setJobName("NNLoadGeneratorViaMR");
jobConf.setNumMapTasks(numMapTasks);
jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
jobConf.setInputFormat(DummyInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
// Explicitly set number of max map attempts to 1.
jobConf.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
jobConf.setSpeculativeExecution(false);
// This mapReduce job has no input but has output
FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
try {
JobClient.runJob(jobConf);
} catch (IOException e) {
System.err.println("Failed to run job: " + e.getMessage());
return -1;
}
return 0;
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throws IOException {
//setup job conf
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobConf.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
final FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, jobConf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR, true);
}
}
/**
* Runs the counter job. The counter job determines the number of links in the
* webgraph. This is used during analysis.
*
* @param fs The job file system.
* @param webGraphDb The web graph database to use.
*
* @return The number of nodes in the web graph.
* @throws IOException If an error occurs while running the counter job.
*/
private int runCounter(FileSystem fs, Path webGraphDb)
throws IOException {
// configure the counter job
Path numLinksPath = new Path(webGraphDb, NUM_NODES);
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
JobConf counter = new NutchJob(getConf());
counter.setJobName("LinkRank Counter");
FileInputFormat.addInputPath(counter, nodeDb);
FileOutputFormat.setOutputPath(counter, numLinksPath);
counter.setInputFormat(SequenceFileInputFormat.class);
counter.setMapperClass(Counter.class);
counter.setCombinerClass(Counter.class);
counter.setReducerClass(Counter.class);
counter.setMapOutputKeyClass(Text.class);
counter.setMapOutputValueClass(LongWritable.class);
counter.setOutputKeyClass(Text.class);
counter.setOutputValueClass(LongWritable.class);
counter.setNumReduceTasks(1);
counter.setOutputFormat(TextOutputFormat.class);
counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the counter job, outputs to a single reduce task and file
LOG.info("Starting link counter job");
try {
JobClient.runJob(counter);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished link counter job");
// read the first (and only) line from the file which should be the
// number of links in the web graph
LOG.info("Reading numlinks temp file");
FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
String numLinksLine = buffer.readLine();
readLinks.close();
// check if there are links to process, if none, webgraph might be empty
if (numLinksLine == null || numLinksLine.length() == 0) {
fs.delete(numLinksPath, true);
throw new IOException("No links to process, is the webgraph empty?");
}
// delete temp file and convert and return the number of links as an int
LOG.info("Deleting numlinks temp file");
fs.delete(numLinksPath, true);
String numLinks = numLinksLine.split("\\s+")[1];
return Integer.parseInt(numLinks);
}