下面列出了org.apache.hadoop.mapred.InvalidJobConfException#org.apache.nutch.crawl.CrawlDatum 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
throws ScoringFilterException {
NutchField tlds = doc.getField("tld");
float boost = 1.0f;
if(tlds != null) {
for(Object tld : tlds.getValues()) {
DomainSuffix entry = tldEntries.get(tld.toString());
if(entry != null)
boost *= entry.getBoost();
}
}
return initScore * boost;
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
// check if LANGUAGE found, possibly put there by HTMLLanguageParser
String lang = parse.getData().getParseMeta().get(Metadata.LANGUAGE);
// check if HTTP-header tels us the language
if (lang == null) {
lang = parse.getData().getContentMeta().get(Response.CONTENT_LANGUAGE);
}
if (lang == null || lang.length() == 0) {
lang = "unknown";
}
doc.add("lang", lang);
return doc;
}
/**
* Check a fixed sequence!
*/
public void testFixedSequence() throws Exception {
// Our test directory
Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
Path segment1 = new Path(testDir, "00001");
Path segment2 = new Path(testDir, "00002");
Path segment3 = new Path(testDir, "00003");
createSegment(segment1, CrawlDatum.STATUS_FETCH_GONE, false);
createSegment(segment2, CrawlDatum.STATUS_FETCH_GONE, true);
createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, false);
// Merge the segments and get status
Path mergedSegment = merge(testDir, new Path[]{segment1, segment2, segment3});
Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
// check if LANGUAGE found, possibly put there by HTMLLanguageParser
String lang = parse.getData().getParseMeta().get(Metadata.LANGUAGE);
// check if HTTP-header tels us the language
if (lang == null) {
lang = parse.getData().getContentMeta().get(Response.CONTENT_LANGUAGE);
}
if (lang == null || lang.length() == 0) {
lang = "unknown";
}
doc.add("lang", lang);
return doc;
}
/**
* This will take the metatags that you have listed in your "urlmeta.tags"
* property, and looks for them inside the CrawlDatum object. If they exist,
* this will add it as an attribute inside the NutchDocument.
*
* @see IndexingFilter#filter
*/
public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
CrawlDatum datum, Inlinks inlinks) throws IndexingException {
if (conf != null)
this.setConf(conf);
if (urlMetaTags == null || doc == null)
return doc;
for (String metatag : urlMetaTags) {
Text metadata = (Text) datum.getMetaData().get(new Text(metatag));
if (metadata != null)
doc.add(metatag, metadata.toString());
}
return doc;
}
public void testIt() throws ProtocolException, ParseException {
String urlString;
Protocol protocol;
Content content;
Parse parse;
Configuration conf = NutchConfiguration.create();
for (int i = 0; i < sampleFiles.length; i++) {
urlString = "file:" + sampleDir + fileSeparator + sampleFiles[i];
protocol = new ProtocolFactory(conf).getProtocol(urlString);
content = protocol.getProtocolOutput(new Text(urlString), new CrawlDatum()).getContent();
parse = new ParseUtil(conf).parseByExtensionId("parse-zip",content).get(content.getUrl());
assertTrue(parse.getText().equals(expectedText));
}
}
/**
* @since NUTCH-901
*/
public void testNoParts(){
Configuration conf = NutchConfiguration.create();
conf.setBoolean("moreIndexingFilter.indexMimeTypeParts", false);
MoreIndexingFilter filter = new MoreIndexingFilter();
filter.setConf(conf);
assertNotNull(filter);
NutchDocument doc = new NutchDocument();
ParseImpl parse = new ParseImpl("foo bar", new ParseData());
try{
filter.filter(doc, parse, new Text("http://nutch.apache.org/index.html"), new CrawlDatum(), new Inlinks());
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
assertNotNull(doc);
assertTrue(doc.getFieldNames().contains("type"));
assertEquals(1, doc.getField("type").getValues().size());
assertEquals("text/html", doc.getFieldValue("type"));
}
/**
* The {@link AnchorIndexingFilter} filter object which supports boolean
* configuration settings for the deduplication of anchors.
* See {@code anchorIndexingFilter.deduplicate} in nutch-default.xml.
*
* @param doc The {@link NutchDocument} object
* @param parse The relevant {@link Parse} object passing through the filter
* @param url URL to be filtered for anchor text
* @param datum The {@link CrawlDatum} entry
* @param inlinks The {@link Inlinks} containing anchor text
* @return filtered NutchDocument
*/
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,
Inlinks inlinks) throws IndexingException {
String[] anchors = (inlinks != null ? inlinks.getAnchors()
: new String[0]);
HashSet<String> set = null;
for (int i = 0; i < anchors.length; i++) {
if (deduplicate) {
if (set == null) set = new HashSet<String>();
String lcAnchor = anchors[i].toLowerCase();
// Check if already processed the current anchor
if (!set.contains(lcAnchor)) {
doc.add("anchor", anchors[i]);
// Add to map
set.add(lcAnchor);
}
} else {
doc.add("anchor", anchors[i]);
}
}
return doc;
}
public void fetch(Path segment, int threads)
throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("OldFetcher: starting at " + sdf.format(start));
LOG.info("OldFetcher: segment: " + segment);
}
JobConf job = new NutchJob(getConf());
job.setJobName("fetch " + segment);
job.setInt("fetcher.threads.fetch", threads);
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
// for politeness, don't permit parallel execution of a single task
job.setSpeculativeExecution(false);
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormat(InputFormat.class);
job.setMapRunnerClass(OldFetcher.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(FetcherOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("OldFetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
@Override
public void map(Text key, CrawlDatum value,
OutputCollector<ByteWritable, Text> output, Reporter reporter)
throws IOException {
if (value.getStatus() == CrawlDatum.STATUS_DB_GONE || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
output.collect(OUT, key);
}
}
public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, int outlinkDepth) {
this.url = url;
this.u = u;
this.datum = datum;
this.queueID = queueID;
this.outlinkDepth = outlinkDepth;
}
@Override
public NutchDocument filterInternal(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) {
ParseData parseData = parse.getData();
String sku = parseData.getMeta("sku");
if (StringUtils.isBlank(sku)) {
return null;
}
doc.add("sku", sku);
doc.add("price", parseData.getMeta("price"));
return doc;
}
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if ((out == null) && (job.getNumReduceTasks() != 0)) {
throw new InvalidJobConfException(
"Output directory not set in JobConf.");
}
if (fs == null) {
fs = out.getFileSystem(job);
}
if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))
throw new IOException("Segment already parsed!");
}
@Override
public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
long prevFetchTime, long prevModifiedTime,
long fetchTime, long modifiedTime, int state) {
// Set defaults
INC_RATE = defaultIncRate;
DEC_RATE = defaultDecRate;
// Check if the Content-Type field is available in the CrawlDatum
if (datum.getMetaData().containsKey(HttpHeaders.WRITABLE_CONTENT_TYPE)) {
// Get the MIME-type of the current URL
String currentMime = datum.getMetaData().get(HttpHeaders.WRITABLE_CONTENT_TYPE).toString();
// Get rid of charset
currentMime = currentMime.substring(0, currentMime.indexOf(';'));
// Check if this MIME-type exists in our map
if (mimeMap.containsKey(currentMime)) {
// Yes, set the INC and DEC rates for this MIME-type
INC_RATE = mimeMap.get(currentMime).inc;
DEC_RATE = mimeMap.get(currentMime).dec;
}
}
return super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime,
fetchTime, modifiedTime, state);
}
public void testContentDispositionTitle() throws IndexingException {
Configuration conf = NutchConfiguration.create();
Metadata metadata = new Metadata();
metadata.add(Response.CONTENT_DISPOSITION, "filename=filename.ext");
MoreIndexingFilter filter = new MoreIndexingFilter();
filter.setConf(conf);
NutchDocument doc = filter.filter(new NutchDocument(), new ParseImpl("text", new ParseData(
new ParseStatus(), "title", new Outlink[0], metadata)), new Text(
"http://www.example.com/"), new CrawlDatum(), new Inlinks());
assertEquals("content-disposition not detected", "filename.ext", doc.getFieldValue("title"));
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text urlText, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
try {
URL url = new URL(urlText.toString());
DomainSuffix d = URLUtil.getDomainSuffix(url);
doc.add("tld", d.getDomain());
}catch (Exception ex) {
LOG.warn(ex.toString());
}
return doc;
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
Metadata metadata = parse.getData().getParseMeta();
// index the license
String licenseUrl = metadata.get(CreativeCommons.LICENSE_URL);
if (licenseUrl != null) {
if (LOG.isInfoEnabled()) {
LOG.info("CC: indexing " + licenseUrl + " for: " + url.toString());
}
// add the entire license as cc:license=xxx
addFeature(doc, "license=" + licenseUrl);
// index license attributes extracted of the license url
addUrlFeatures(doc, licenseUrl);
}
// index the license location as cc:meta=xxx
String licenseLocation = metadata.get(CreativeCommons.LICENSE_LOCATION);
if (licenseLocation != null) {
addFeature(doc, "meta=" + licenseLocation);
}
// index the work type cc:type=xxx
String workType = metadata.get(CreativeCommons.WORK_TYPE);
if (workType != null) {
addFeature(doc, workType);
}
return doc;
}
private void writeOutAsDuplicate(CrawlDatum datum,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
Text key = (Text) datum.getMetaData().remove(urlKey);
reporter.incrCounter("DeduplicationJobStatus",
"Documents marked as duplicate", 1);
output.collect(key, datum);
}
public void reduce(Text key, Iterator<CrawlDatum> values,
OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
CrawlDatum val = values.next();
output.collect(key, val);
}
}
public CrawlDatum getCrawlDatum(Text url) throws IOException {
synchronized (crawlLock) {
if (crawl == null)
crawl = getReaders(CrawlDatum.FETCH_DIR_NAME);
}
return (CrawlDatum)getEntry(crawl, url, new CrawlDatum());
}
private void assertContentType(Configuration conf, String source, String expected) throws IndexingException {
Metadata metadata = new Metadata();
metadata.add(Response.CONTENT_TYPE, source);
MoreIndexingFilter filter = new MoreIndexingFilter();
filter.setConf(conf);
NutchDocument doc = filter.filter(new NutchDocument(), new ParseImpl("text", new ParseData(
new ParseStatus(), "title", new Outlink[0], metadata)), new Text(
"http://www.example.com/"), new CrawlDatum(), new Inlinks());
assertEquals("mime type not detected", expected, doc.getFieldValue("type"));
}
@Override
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
LOG.debug("Invoking indexer {} for url: {}", this.getClass().getName(), url);
if (doc == null) {
LOG.debug("Skipped as NutchDocument doc is null");
return doc;
}
return filterInternal(doc, parse, url, datum, inlinks);
}
public void map(Text urlText, CrawlDatum datum, Context context) throws IOException, InterruptedException {
if(datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED
|| datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
try {
URL url = new URL(urlText.toString());
String out = null;
switch (mode) {
case MODE_HOST:
out = url.getHost();
break;
case MODE_DOMAIN:
out = URLUtil.getDomainName(url);
break;
case MODE_SUFFIX:
out = URLUtil.getDomainSuffix(url).getDomain();
break;
case MODE_TLD:
out = URLUtil.getTopLevelDomainName(url);
break;
}
if(out.trim().equals("")) {
LOG.info("url : " + url);
context.getCounter(MyCounter.EMPTY_RESULT).increment(1);
}
context.write(new Text(out), new LongWritable(1));
} catch (Exception ex) { }
context.getCounter(MyCounter.FETCHED).increment(1);
context.write(FETCHED_TEXT, new LongWritable(1));
}
else {
context.getCounter(MyCounter.NOT_FETCHED).increment(1);
context.write(NOT_FETCHED_TEXT, new LongWritable(1));
}
}
/**
* Takes the metadata, specified in your "urlmeta.tags" property, from the
* datum object and injects it into the content. This is transfered to the
* parseData object.
*
* @see ScoringFilter#passScoreBeforeParsing
* @see URLMetaScoringFilter#passScoreAfterParsing
*/
public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) {
if (urlMetaTags == null || content == null || datum == null)
return;
for (String metatag : urlMetaTags) {
Text metaFromDatum = (Text) datum.getMetaData().get(new Text(metatag));
if (metaFromDatum == null)
continue;
content.getMetadata().set(metatag, metaFromDatum.toString());
}
}
/**
* Takes the metadata, specified in your "urlmeta.tags" property, from the
* datum object and injects it into the content. This is transfered to the
* parseData object.
*
* @see ScoringFilter#passScoreBeforeParsing
* @see URLMetaScoringFilter#passScoreAfterParsing
*/
public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) {
if (urlMetaTags == null || content == null || datum == null)
return;
for (String metatag : urlMetaTags) {
Text metaFromDatum = (Text) datum.getMetaData().get(new Text(metatag));
if (metaFromDatum == null)
continue;
content.getMetadata().set(metatag, metaFromDatum.toString());
}
}
/** Calculate a sort value for Generate. */
public float generatorSortValue(Text url, CrawlDatum datum, float initSort) throws ScoringFilterException {
for (int i = 0; i < this.filters.length; i++) {
initSort = this.filters[i].generatorSortValue(url, datum, initSort);
}
return initSort;
}
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) {
LOG.info("-------->>>>> WE ARE IN THE INDExer-------------------");
String containsSem = "false";
containsSem = parse.getData().getMeta(WdcParser.META_CONTAINS_SEM);
// we don't have to add the triples in a separate field as they are
// already in the content field
// String triples = "";
// triples = parse.getText();
// doc.add("triples", triples);
// // check if the father contains sem data
// boolean semFather = false;
// try {
// semFather =
// Boolean.parseBoolean(datum.getMetaData().get(WdcParser.META_CONTAINS_SEM_FATHER).toString());
//
// } catch (Exception e) {
// LOG.error("CANNOT PROCESS THE FATHER SEM FIELD" + e.getMessage());
// }
// adds the new field to the document
doc.add("containsSem", containsSem);
return doc;
}
public static void initMRJob(Path crawlDb, Path linkDb,
Collection<Path> segments,
JobConf job) {
LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
if (linkDb!=null)
LOG.info("IndexerMapReduce: linkdb: " + linkDb);
for (final Path segment : segments) {
LOG.info("IndexerMapReduces: adding segment: " + segment);
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
}
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
if (linkDb!=null)
FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(IndexerMapReduce.class);
job.setReducerClass(IndexerMapReduce.class);
job.setOutputFormat(IndexerOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class);
job.setOutputValueClass(NutchWritable.class);
}
@Override
public void map(Text key, CrawlDatum value,
OutputCollector<ByteWritable, Text> output, Reporter reporter)
throws IOException {
if (value.getStatus() == CrawlDatum.STATUS_DB_GONE) {
output.collect(OUT, key);
}
}
/** Increase the score by a sum of inlinked scores. */
public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, List inlinked) throws ScoringFilterException {
float adjust = 0.0f;
for (int i = 0; i < inlinked.size(); i++) {
CrawlDatum linked = (CrawlDatum)inlinked.get(i);
adjust += linked.getScore();
}
if (old == null) old = datum;
datum.setScore(old.getScore() + adjust);
}