下面列出了org.apache.hadoop.mapred.JobContextImpl#org.apache.hadoop.mapred.SequenceFileOutputFormat 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private JobConf createJobConf() {
JobConf jobConf = new JobConf(getConf());
String jobName = NAME + " " + dateForm.format(new Date(System.currentTimeMillis()));
jobConf.setJobName(jobName);
jobConf.setMapSpeculativeExecution(false);
jobConf.setJarByClass(DataFsck.class);
jobConf.setInputFormat(DataFsckInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setMapperClass(DataFsckMapper.class);
jobConf.setNumReduceTasks(0);
return jobConf;
}
protected static void run(JavaSparkContext sparkContext) {
JavaPairRDD<Text, IntWritable> javaPairRDD = sparkContext.sequenceFile("url", Text.class, IntWritable.class);
JavaPairRDD<String, Integer> pairRDD = javaPairRDD.mapToPair(new sequenceToConvert());
//写
pairRDD.saveAsHadoopFile("url",Text.class,IntWritable.class,SequenceFileOutputFormat.class);
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
/**
* Runs the inverter job. The inverter job flips outlinks to inlinks to be
* passed into the analysis job.
*
* The inverter job takes a link loops database if it exists. It is an
* optional componenet of link analysis due to its extreme computational and
* space requirements but it can be very useful is weeding out and eliminating
* link farms and other spam pages.
*
* @param nodeDb The node database to use.
* @param outlinkDb The outlink database to use.
* @param loopDb The loop database to use if it exists.
* @param output The output directory.
*
* @throws IOException If an error occurs while running the inverter job.
*/
private void runInverter(Path nodeDb, Path outlinkDb, Path loopDb, Path output)
throws IOException {
// configure the inverter
JobConf inverter = new NutchJob(getConf());
inverter.setJobName("LinkAnalysis Inverter");
FileInputFormat.addInputPath(inverter, nodeDb);
FileInputFormat.addInputPath(inverter, outlinkDb);
// add the loop database if it exists, isn't null
if (loopDb != null) {
FileInputFormat.addInputPath(inverter, loopDb);
}
FileOutputFormat.setOutputPath(inverter, output);
inverter.setInputFormat(SequenceFileInputFormat.class);
inverter.setMapperClass(Inverter.class);
inverter.setReducerClass(Inverter.class);
inverter.setMapOutputKeyClass(Text.class);
inverter.setMapOutputValueClass(ObjectWritable.class);
inverter.setOutputKeyClass(Text.class);
inverter.setOutputValueClass(LinkDatum.class);
inverter.setOutputFormat(SequenceFileOutputFormat.class);
inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the inverter job
LOG.info("Starting inverter job");
try {
JobClient.runJob(inverter);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished inverter job.");
}
/**
* Create a {@link RecordWriter} from path.
*/
public RecordWriter createRecordWriter(Path path) {
try {
checkInitialize();
JobConf conf = new JobConf(confWrapper.conf());
if (isCompressed) {
String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
//noinspection unchecked
Class<? extends CompressionCodec> codec =
(Class<? extends CompressionCodec>) Class.forName(codecStr, true,
Thread.currentThread().getContextClassLoader());
FileOutputFormat.setOutputCompressorClass(conf, codec);
}
String typeStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE.varname);
if (!StringUtils.isNullOrWhitespaceOnly(typeStr)) {
SequenceFile.CompressionType style = SequenceFile.CompressionType.valueOf(typeStr);
SequenceFileOutputFormat.setOutputCompressionType(conf, style);
}
}
return hiveShim.getHiveRecordWriter(
conf,
hiveOutputFormatClz,
recordSerDe.getSerializedClass(),
isCompressed,
tableProperties,
path);
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
List<SequenceFile.Reader> getOutputs(List<JobContext> submitted) throws IOException {
List<SequenceFile.Reader> outputs = new ArrayList<SequenceFile.Reader>();
for (JobContext ctx: submitted) {
SequenceFile.Reader[] jobOutputs = SequenceFileOutputFormat.getReaders(
getConf(),
SequenceFileOutputFormat.getOutputPath(ctx.jobConf));
for (SequenceFile.Reader r: jobOutputs) {
outputs.add(r);
}
}
return outputs;
}
/**
* Runs the inverter job. The inverter job flips outlinks to inlinks to be
* passed into the analysis job.
*
* The inverter job takes a link loops database if it exists. It is an
* optional componenet of link analysis due to its extreme computational and
* space requirements but it can be very useful is weeding out and eliminating
* link farms and other spam pages.
*
* @param nodeDb The node database to use.
* @param outlinkDb The outlink database to use.
* @param loopDb The loop database to use if it exists.
* @param output The output directory.
*
* @throws IOException If an error occurs while running the inverter job.
*/
private void runInverter(Path nodeDb, Path outlinkDb, Path loopDb, Path output)
throws IOException {
// configure the inverter
JobConf inverter = new NutchJob(getConf());
inverter.setJobName("LinkAnalysis Inverter");
FileInputFormat.addInputPath(inverter, nodeDb);
FileInputFormat.addInputPath(inverter, outlinkDb);
// add the loop database if it exists, isn't null
if (loopDb != null) {
FileInputFormat.addInputPath(inverter, loopDb);
}
FileOutputFormat.setOutputPath(inverter, output);
inverter.setInputFormat(SequenceFileInputFormat.class);
inverter.setMapperClass(Inverter.class);
inverter.setReducerClass(Inverter.class);
inverter.setMapOutputKeyClass(Text.class);
inverter.setMapOutputValueClass(ObjectWritable.class);
inverter.setOutputKeyClass(Text.class);
inverter.setOutputValueClass(LinkDatum.class);
inverter.setOutputFormat(SequenceFileOutputFormat.class);
inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
// run the inverter job
LOG.info("Starting inverter job");
try {
JobClient.runJob(inverter);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
LOG.info("Finished inverter job.");
}
private OutputFormat<K,V> getOutputFormat( JobConf job )
{
return ReflectionUtils.newInstance( job.getClass( "permap.output.format.class",
SequenceFileOutputFormat.class,
OutputFormat.class ),
job );
}
public static void filter(String alignpath,
String outpath,
int nummappers,
int numreducers) throws IOException, Exception
{
System.out.println("NUM_FMAP_TASKS: " + nummappers);
System.out.println("NUM_FREDUCE_TASKS: " + numreducers);
JobConf conf = new JobConf(FilterAlignments.class);
conf.setJobName("FilterAlignments");
conf.setNumMapTasks(nummappers);
conf.setNumReduceTasks(numreducers);
FileInputFormat.addInputPath(conf, new Path(alignpath));
conf.setMapperClass(FilterMapClass.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(BytesWritable.class);
conf.setCombinerClass(FilterCombinerClass.class);
conf.setReducerClass(FilterReduceClass.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(BytesWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
Path oPath = new Path(outpath);
FileOutputFormat.setOutputPath(conf, oPath);
System.err.println(" Removing old results");
FileSystem.get(conf).delete(oPath);
JobClient.runJob(conf);
System.err.println("FilterAlignments Finished");
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
/**
* Converts a libsvm text input file into two binary block matrices for features
* and labels, and saves these to the specified output files. This call also deletes
* existing files at the specified output locations, as well as determines and
* writes the meta data files of both output matrices.
* <p>
* Note: We use {@code org.apache.spark.mllib.util.MLUtils.loadLibSVMFile} for parsing
* the libsvm input files in order to ensure consistency with Spark.
*
* @param sc java spark context
* @param pathIn path to libsvm input file
* @param pathX path to binary block output file of features
* @param pathY path to binary block output file of labels
* @param mcOutX matrix characteristics of output matrix X
*/
public static void libsvmToBinaryBlock(JavaSparkContext sc, String pathIn,
String pathX, String pathY, DataCharacteristics mcOutX)
{
if( !mcOutX.dimsKnown() )
throw new DMLRuntimeException("Matrix characteristics "
+ "required to convert sparse input representation.");
try {
//cleanup existing output files
HDFSTool.deleteFileIfExistOnHDFS(pathX);
HDFSTool.deleteFileIfExistOnHDFS(pathY);
//convert libsvm to labeled points
int numFeatures = (int) mcOutX.getCols();
int numPartitions = SparkUtils.getNumPreferredPartitions(mcOutX, null);
JavaRDD<org.apache.spark.mllib.regression.LabeledPoint> lpoints =
MLUtils.loadLibSVMFile(sc.sc(), pathIn, numFeatures, numPartitions).toJavaRDD();
//append row index and best-effort caching to avoid repeated text parsing
JavaPairRDD<org.apache.spark.mllib.regression.LabeledPoint,Long> ilpoints =
lpoints.zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK());
//extract labels and convert to binary block
DataCharacteristics mc1 = new MatrixCharacteristics(mcOutX.getRows(), 1, mcOutX.getBlocksize(), -1);
LongAccumulator aNnz1 = sc.sc().longAccumulator("nnz");
JavaPairRDD<MatrixIndexes,MatrixBlock> out1 = ilpoints
.mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc1, true, aNnz1));
int numPartitions2 = SparkUtils.getNumPreferredPartitions(mc1, null);
out1 = RDDAggregateUtils.mergeByKey(out1, numPartitions2, false);
out1.saveAsHadoopFile(pathY, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
mc1.setNonZeros(aNnz1.value()); //update nnz after triggered save
HDFSTool.writeMetaDataFile(pathY+".mtd", ValueType.FP64, mc1, OutputInfo.BinaryBlockOutputInfo);
//extract data and convert to binary block
DataCharacteristics mc2 = new MatrixCharacteristics(mcOutX.getRows(), mcOutX.getCols(), mcOutX.getBlocksize(), -1);
LongAccumulator aNnz2 = sc.sc().longAccumulator("nnz");
JavaPairRDD<MatrixIndexes,MatrixBlock> out2 = ilpoints
.mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc2, false, aNnz2));
out2 = RDDAggregateUtils.mergeByKey(out2, numPartitions, false);
out2.saveAsHadoopFile(pathX, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
mc2.setNonZeros(aNnz2.value()); //update nnz after triggered save
HDFSTool.writeMetaDataFile(pathX+".mtd", ValueType.FP64, mc2, OutputInfo.BinaryBlockOutputInfo);
//asynchronous cleanup of cached intermediates
ilpoints.unpersist(false);
}
catch(IOException ex) {
throw new DMLRuntimeException(ex);
}
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir), true);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir), true);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}
public int run(String[] args) throws Exception {
// Get current configuration.
Configuration conf = getConf();
// Parse command line arguments.
String inputPath = args[0];
String outputPath = args[1];
String maxArcFiles = "";
if (args.length == 3)
maxArcFiles = args[2];
// Set the maximum number of arc files to process.
conf.set(MAX_FILES_KEY, maxArcFiles);
JobConf job = new JobConf(conf);
// Set input path.
if (inputPath.length() > 0) {
LOG.info("Setting input path to " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileInputFormat.setInputPathFilter(job, FileCountFilter.class);
} else {
System.err.println("No input path found.");
return 1;
}
// Set output path.
if (outputPath.length() > 0) {
LOG.info("Setting output path to " + outputPath);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
// Compress output to boost performance.
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
} else {
System.err.println("No output path found.");
return 1;
}
// Load other classes from same jar a this class.
job.setJarByClass(WikiReverse.class);
// Input is in WARC file format.
job.setInputFormat(WarcFileInputFormat.class);
// Output is Hadoop sequence file format.
job.setOutputFormat(SequenceFileOutputFormat.class);
// Set the output data types.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LinkArrayWritable.class);
// Use custom mapper class.
job.setMapRunnerClass(WikiReverseMapper.class);
// Use custom reducer class.
job.setReducerClass(LinkArrayReducer.class);
// Allow 5 percent of map tasks to fail.
job.setMaxMapTaskFailuresPercent(MAX_MAP_TASK_FAILURES_PERCENT);
if (JobClient.runJob(job).isSuccessful())
return 0;
else
return 1;
}
public int run(String[] args) throws Exception {
// Get current configuration.
Configuration conf = getConf();
// Parse command line arguments.
String inputPaths = args[0];
String outputPath = args[1];
JobConf job = new JobConf(conf);
// Set input path.
if (inputPaths.length() > 0) {
List<String> segmentPaths = Lists.newArrayList(Splitter.on(",")
.split(inputPaths));
for (String segmentPath : segmentPaths) {
LOG.info("Adding input path " + segmentPath);
FileInputFormat.addInputPath(job, new Path(segmentPath));
}
} else {
System.err.println("No input path found.");
return 1;
}
// Set output path.
if (outputPath.length() > 0) {
LOG.info("Setting output path to " + outputPath);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
// Compress output to boost performance.
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
} else {
System.err.println("No output path found.");
return 1;
}
// Load other classes from same jar as this class.
job.setJarByClass(SegmentCombiner.class);
// Input is Hadoop sequence file format.
job.setInputFormat(SequenceFileInputFormat.class);
// Output is Hadoop sequence file format.
job.setOutputFormat(SequenceFileOutputFormat.class);
// Set the output data types.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LinkArrayWritable.class);
// Use custom mapper class.
job.setMapperClass(SegmentCombinerMapper.class);
// Use custom reducer class.
job.setReducerClass(LinkArrayReducer.class);
if (JobClient.runJob(job).isSuccessful())
return 0;
else
return 1;
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content)
contentOut.append(key, w);
else if (w instanceof Parse)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
Path tempDir =
new Path(config.get("mapred.temp.dir", ".") +
"/readdb-topN-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("topN prepare " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbTopNMapper.class);
job.setReducerClass(IdentityReducer.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
// XXX hmmm, no setFloat() in the API ... :(
job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
JobClient.runJob(job);
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: collecting topN scores.");
}
job = new NutchJob(config);
job.setJobName("topN collect " + crawlDb);
job.setLong("db.reader.topn", topN);
FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(CrawlDbTopNReducer.class);
FileOutputFormat.setOutputPath(job, outFolder);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1); // create a single file.
JobClient.runJob(job);
FileSystem fs = FileSystem.get(config);
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: FreeGenerator <inputDir> <segmentsDir> [-filter] [-normalize]");
System.err.println("\tinputDir\tinput directory containing one or more input files.");
System.err.println("\t\tEach text file contains a list of URLs, one URL per line");
System.err.println("\tsegmentsDir\toutput directory, where new segment will be created");
System.err.println("\t-filter\trun current URLFilters on input URLs");
System.err.println("\t-normalize\trun current URLNormalizers on input URLs");
return -1;
}
boolean filter = false;
boolean normalize = false;
if (args.length > 2) {
for (int i = 2; i < args.length; i++) {
if (args[i].equals("-filter")) {
filter = true;
} else if (args[i].equals("-normalize")) {
normalize = true;
} else {
LOG.error("Unknown argument: " + args[i] + ", exiting ...");
return -1;
}
}
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("FreeGenerator: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
job.setBoolean(FILTER_KEY, filter);
job.setBoolean(NORMALIZE_KEY, normalize);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(FG.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Generator.SelectorEntry.class);
job.setPartitionerClass(URLPartitioner.class);
job.setReducerClass(FG.class);
String segName = Generator.generateSegmentName();
job.setNumReduceTasks(job.getNumMapTasks());
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(Generator.HashComparator.class);
FileOutputFormat.setOutputPath(job, new Path(args[1],
new Path(segName, CrawlDatum.GENERATE_DIR_NAME)));
try {
JobClient.runJob(job);
} catch (Exception e) {
LOG.error("FAILED: " + StringUtils.stringifyException(e));
return -1;
}
long end = System.currentTimeMillis();
LOG.info("FreeGenerator: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
return 0;
}
/**
* Runs the process to dump the top urls out to a text file.
*
* @param webGraphDb The WebGraph from which to pull values.
*
* @param topN
* @param output
*
* @throws IOException If an error occurs while dumping the top values.
*/
public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, boolean asEff, NameType nameType, AggrType aggrType, boolean asSequenceFile)
throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("NodeDumper: starting at " + sdf.format(start));
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Configuration conf = getConf();
JobConf dumper = new NutchJob(conf);
dumper.setJobName("NodeDumper: " + webGraphDb);
FileInputFormat.addInputPath(dumper, nodeDb);
dumper.setInputFormat(SequenceFileInputFormat.class);
if (nameType == null) {
dumper.setMapperClass(Sorter.class);
dumper.setReducerClass(Sorter.class);
dumper.setMapOutputKeyClass(FloatWritable.class);
dumper.setMapOutputValueClass(Text.class);
} else {
dumper.setMapperClass(Dumper.class);
dumper.setReducerClass(Dumper.class);
dumper.setMapOutputKeyClass(Text.class);
dumper.setMapOutputValueClass(FloatWritable.class);
}
dumper.setOutputKeyClass(Text.class);
dumper.setOutputValueClass(FloatWritable.class);
FileOutputFormat.setOutputPath(dumper, output);
if (asSequenceFile) {
dumper.setOutputFormat(SequenceFileOutputFormat.class);
} else {
dumper.setOutputFormat(TextOutputFormat.class);
}
dumper.setNumReduceTasks(1);
dumper.setBoolean("inlinks", type == DumpType.INLINKS);
dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
dumper.setBoolean("scores", type == DumpType.SCORES);
dumper.setBoolean("host", nameType == NameType.HOST);
dumper.setBoolean("domain", nameType == NameType.DOMAIN);
dumper.setBoolean("sum", aggrType == AggrType.SUM);
dumper.setBoolean("max", aggrType == AggrType.MAX);
dumper.setLong("topn", topN);
// Set equals-sign as separator for Solr's ExternalFileField
if (asEff) {
dumper.set("mapred.textoutputformat.separator", "=");
}
try {
LOG.info("NodeDumper: running");
JobClient.runJob(dumper);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
long end = System.currentTimeMillis();
LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
/**
* Converts a libsvm text input file into two binary block matrices for features
* and labels, and saves these to the specified output files. This call also deletes
* existing files at the specified output locations, as well as determines and
* writes the meta data files of both output matrices.
* <p>
* Note: We use {@code org.apache.spark.mllib.util.MLUtils.loadLibSVMFile} for parsing
* the libsvm input files in order to ensure consistency with Spark.
*
* @param sc java spark context
* @param pathIn path to libsvm input file
* @param pathX path to binary block output file of features
* @param pathY path to binary block output file of labels
* @param mcOutX matrix characteristics of output matrix X
*/
public static void libsvmToBinaryBlock(JavaSparkContext sc, String pathIn,
String pathX, String pathY, DataCharacteristics mcOutX)
{
if( !mcOutX.dimsKnown() )
throw new DMLRuntimeException("Matrix characteristics "
+ "required to convert sparse input representation.");
try {
//cleanup existing output files
HDFSTool.deleteFileIfExistOnHDFS(pathX);
HDFSTool.deleteFileIfExistOnHDFS(pathY);
//convert libsvm to labeled points
int numFeatures = (int) mcOutX.getCols();
int numPartitions = SparkUtils.getNumPreferredPartitions(mcOutX, null);
JavaRDD<org.apache.spark.mllib.regression.LabeledPoint> lpoints =
MLUtils.loadLibSVMFile(sc.sc(), pathIn, numFeatures, numPartitions).toJavaRDD();
//append row index and best-effort caching to avoid repeated text parsing
JavaPairRDD<org.apache.spark.mllib.regression.LabeledPoint,Long> ilpoints =
lpoints.zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK());
//extract labels and convert to binary block
DataCharacteristics mc1 = new MatrixCharacteristics(mcOutX.getRows(), 1, mcOutX.getBlocksize(), -1);
LongAccumulator aNnz1 = sc.sc().longAccumulator("nnz");
JavaPairRDD<MatrixIndexes,MatrixBlock> out1 = ilpoints
.mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc1, true, aNnz1));
int numPartitions2 = SparkUtils.getNumPreferredPartitions(mc1, null);
out1 = RDDAggregateUtils.mergeByKey(out1, numPartitions2, false);
out1.saveAsHadoopFile(pathY, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
mc1.setNonZeros(aNnz1.value()); //update nnz after triggered save
HDFSTool.writeMetaDataFile(pathY+".mtd", ValueType.FP64, mc1, FileFormat.BINARY);
//extract data and convert to binary block
DataCharacteristics mc2 = new MatrixCharacteristics(mcOutX.getRows(), mcOutX.getCols(), mcOutX.getBlocksize(), -1);
LongAccumulator aNnz2 = sc.sc().longAccumulator("nnz");
JavaPairRDD<MatrixIndexes,MatrixBlock> out2 = ilpoints
.mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc2, false, aNnz2));
out2 = RDDAggregateUtils.mergeByKey(out2, numPartitions, false);
out2.saveAsHadoopFile(pathX, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
mc2.setNonZeros(aNnz2.value()); //update nnz after triggered save
HDFSTool.writeMetaDataFile(pathX+".mtd", ValueType.FP64, mc2, FileFormat.BINARY);
//asynchronous cleanup of cached intermediates
ilpoints.unpersist(false);
}
catch(IOException ex) {
throw new DMLRuntimeException(ex);
}
}
/**
* This is the main routine for launching a distributed random write job.
* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
* The reduce doesn't do anything.
*
* @throws IOException
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
return printUsage();
}
JobConf job = new JobConf(getConf());
job.setJarByClass(RandomTextWriter.class);
job.setJobName("random-text-writer");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(RandomWriter.RandomInputFormat.class);
job.setMapperClass(Map.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
long numBytesToWritePerMap = job.getLong("test.randomtextwrite.bytes_per_map",
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
return -2;
}
long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
}
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-outFormat".equals(args[i])) {
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else {
otherArgs.add(args[i]);
}
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage(); // exits
}
}
job.setOutputFormat(outputFormatClass);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throws IOException {
//setup job conf
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobConf.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
final FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, jobConf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR, true);
}
}
/**
* This is the main routine for launching a distributed random write job.
* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
* The reduce doesn't do anything.
*
* @throws IOException
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: writer <out-dir>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Path outDir = new Path(args[0]);
JobConf job = new JobConf(getConf());
job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormat(RandomInputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
return -2;
}
long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
}
job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
// reducer NONE
job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(job);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return 0;
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir));
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content && contentOut != null)
contentOut.append(key, w);
else if (w instanceof Parse && parseOut != null)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
Path tempDir =
new Path(config.get("mapred.temp.dir", ".") +
"/readdb-topN-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("topN prepare " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbTopNMapper.class);
job.setReducerClass(IdentityReducer.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
// XXX hmmm, no setFloat() in the API ... :(
job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
JobClient.runJob(job);
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: collecting topN scores.");
}
job = new NutchJob(config);
job.setJobName("topN collect " + crawlDb);
job.setLong("db.reader.topn", topN);
FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(CrawlDbTopNReducer.class);
FileOutputFormat.setOutputPath(job, outFolder);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1); // create a single file.
JobClient.runJob(job);
FileSystem fs = FileSystem.get(config);
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: FreeGenerator <inputDir> <segmentsDir> [-filter] [-normalize]");
System.err.println("\tinputDir\tinput directory containing one or more input files.");
System.err.println("\t\tEach text file contains a list of URLs, one URL per line");
System.err.println("\tsegmentsDir\toutput directory, where new segment will be created");
System.err.println("\t-filter\trun current URLFilters on input URLs");
System.err.println("\t-normalize\trun current URLNormalizers on input URLs");
return -1;
}
boolean filter = false;
boolean normalize = false;
if (args.length > 2) {
for (int i = 2; i < args.length; i++) {
if (args[i].equals("-filter")) {
filter = true;
} else if (args[i].equals("-normalize")) {
normalize = true;
} else {
LOG.error("Unknown argument: " + args[i] + ", exiting ...");
return -1;
}
}
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("FreeGenerator: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
job.setBoolean(FILTER_KEY, filter);
job.setBoolean(NORMALIZE_KEY, normalize);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(FG.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Generator.SelectorEntry.class);
job.setPartitionerClass(URLPartitioner.class);
job.setReducerClass(FG.class);
String segName = Generator.generateSegmentName();
job.setNumReduceTasks(job.getNumMapTasks());
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(Generator.HashComparator.class);
FileOutputFormat.setOutputPath(job, new Path(args[1],
new Path(segName, CrawlDatum.GENERATE_DIR_NAME)));
try {
JobClient.runJob(job);
} catch (Exception e) {
LOG.error("FAILED: " + StringUtils.stringifyException(e));
return -1;
}
long end = System.currentTimeMillis();
LOG.info("FreeGenerator: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
return 0;
}
/**
* Runs the process to dump the top urls out to a text file.
*
* @param webGraphDb The WebGraph from which to pull values.
*
* @param topN
* @param output
*
* @throws IOException If an error occurs while dumping the top values.
*/
public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, boolean asEff, NameType nameType, AggrType aggrType, boolean asSequenceFile)
throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("NodeDumper: starting at " + sdf.format(start));
Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
Configuration conf = getConf();
JobConf dumper = new NutchJob(conf);
dumper.setJobName("NodeDumper: " + webGraphDb);
FileInputFormat.addInputPath(dumper, nodeDb);
dumper.setInputFormat(SequenceFileInputFormat.class);
if (nameType == null) {
dumper.setMapperClass(Sorter.class);
dumper.setReducerClass(Sorter.class);
dumper.setMapOutputKeyClass(FloatWritable.class);
dumper.setMapOutputValueClass(Text.class);
} else {
dumper.setMapperClass(Dumper.class);
dumper.setReducerClass(Dumper.class);
dumper.setMapOutputKeyClass(Text.class);
dumper.setMapOutputValueClass(FloatWritable.class);
}
dumper.setOutputKeyClass(Text.class);
dumper.setOutputValueClass(FloatWritable.class);
FileOutputFormat.setOutputPath(dumper, output);
if (asSequenceFile) {
dumper.setOutputFormat(SequenceFileOutputFormat.class);
} else {
dumper.setOutputFormat(TextOutputFormat.class);
}
dumper.setNumReduceTasks(1);
dumper.setBoolean("inlinks", type == DumpType.INLINKS);
dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
dumper.setBoolean("scores", type == DumpType.SCORES);
dumper.setBoolean("host", nameType == NameType.HOST);
dumper.setBoolean("domain", nameType == NameType.DOMAIN);
dumper.setBoolean("sum", aggrType == AggrType.SUM);
dumper.setBoolean("max", aggrType == AggrType.MAX);
dumper.setLong("topn", topN);
// Set equals-sign as separator for Solr's ExternalFileField
if (asEff) {
dumper.set("mapred.textoutputformat.separator", "=");
}
try {
LOG.info("NodeDumper: running");
JobClient.runJob(dumper);
}
catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
long end = System.currentTimeMillis();
LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}