类org.apache.hadoop.mapred.Reporter源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.Reporter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: StreamXmlRecordReader.java
public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
                             JobConf job, FileSystem fs) throws IOException {
  super(in, split, reporter, job, fs);

  beginMark_ = checkJobGet(CONF_NS + "begin");
  endMark_ = checkJobGet(CONF_NS + "end");

  maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
  lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
  synched_ = false;

  slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
  if (slowMatch_) {
    beginPat_ = makePatternCDataOrMark(beginMark_);
    endPat_ = makePatternCDataOrMark(endMark_);
  }
  init();
}
 
public String getBestFrame(String frameLine, String parseLine, Reporter reporter)
{
	String result = null;
	Set<String> set = mFrameMap.keySet();
	double maxVal = -Double.MIN_VALUE;
	for(String frame: set)
	{
		String[] toks = frameLine.split("\t");
		String newFrameLine = frame+"\t"+toks[1]+"\t"+toks[2];
		LogFormula formula = getNumeratorFormula(newFrameLine, parseLine, reporter);
		double val = formula.evaluate(this).exponentiate();
		if(val>maxVal)
		{
			maxVal = val;
			result=""+frame;
		}
		if(reporter!=null)
			reporter.setStatus("Considered "+frame+" for frameLine:"+frameLine);
		System.out.println("Considered "+frame+" for frameLine:"+frameLine);
	}
	return result;
}
 
源代码3 项目: big-c   文件: TestDFSIO.java
@Override // IOMapperBase
public Long doIO(Reporter reporter, 
                   String name, 
                   long totalSize // in bytes
                 ) throws IOException {
  OutputStream out = (OutputStream)this.stream;
  // write to the file
  long nrRemaining;
  for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
    int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
    out.write(buffer, 0, curSize);
    reporter.setStatus("writing " + name + "@" + 
                       (totalSize - nrRemaining) + "/" + totalSize 
                       + " ::host = " + hostName);
  }
  return Long.valueOf(totalSize);
}
 
源代码4 项目: hadoop-gpu   文件: PipesReducer.java
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
  if (application == null) {
    try {
      LOG.info("starting application");
      application = 
        new Application<K2, V2, K3, V3>(
            job, null, output, reporter, 
            (Class<? extends K3>) job.getOutputKeyClass(), 
            (Class<? extends V3>) job.getOutputValueClass());
      downlink = application.getDownlink();
    } catch (InterruptedException ie) {
      throw new RuntimeException("interrupted", ie);
    }
    int reduce=0;
    downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
  }
}
 
@Override
public void close(Reporter reporter) throws IOException {
    try {
        LOG.warn("I was called : close");
        processRowErrors(session.close());
        shutdownClient();
    } catch (Exception e) {
        throw new IOException("Encountered an error while closing this task", e);
    } finally {
        if (reporter != null) {
            // This is the only place where we have access to the context in the record writer,
            // so set the counter here.
            reporter.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
        }
    }
}
 
源代码6 项目: nutch-htmlunit   文件: CleaningJob.java
@Override
public void reduce(ByteWritable key, Iterator<Text> values,
        OutputCollector<Text, ByteWritable> output, Reporter reporter)
        throws IOException {
    while (values.hasNext()) {
        Text document = values.next();
        writers.delete(document.toString());
        totalDeleted++;
        reporter.incrCounter("CleaningJobStatus", "Deleted documents",
                1);
        // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
        // LOG.info("CleaningJob: deleting " + numDeletes
        // + " documents");
        // // TODO updateRequest.process(solr);
        // // TODO updateRequest = new UpdateRequest();
        // writers.delete(key.toString());
        // totalDeleted += numDeletes;
        // numDeletes = 0;
        // }
    }
}
 
源代码7 项目: flink   文件: HadoopOutputFormatTest.java
@Test
public void testCloseWithoutTaskCommit() throws Exception {
	OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
	DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
	when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
	DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
	JobConf jobConf = mock(JobConf.class);

	HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
	outputFormat.recordWriter = recordWriter;
	outputFormat.outputCommitter = outputCommitter;

	outputFormat.close();

	verify(recordWriter, times(1)).close(any(Reporter.class));
	verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
}
 
源代码8 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码9 项目: big-c   文件: ValueAggregatorCombiner.java
/** Combines values for a given key.  
 * @param key the key is expected to be a Text object, whose prefix indicates
 * the type of aggregation to aggregate the values. 
 * @param values the values to combine
 * @param output to collect combined values
 */
public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  String keyStr = key.toString();
  int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
  String type = keyStr.substring(0, pos);
  ValueAggregator aggregator = ValueAggregatorBaseDescriptor
    .generateValueAggregator(type);
  while (values.hasNext()) {
    aggregator.addNextValue(values.next());
  }
  Iterator outputs = aggregator.getCombinerOutput().iterator();

  while (outputs.hasNext()) {
    Object v = outputs.next();
    if (v instanceof Text) {
      output.collect(key, (Text)v);
    } else {
      output.collect(key, new Text(v.toString()));
    }
  }
}
 
源代码10 项目: wikireverse   文件: WikiMetadata.java
public Hashtable<String, LinkWritable> createResults(Page page, Reporter reporter)
		throws IOException, JsonParseException, URISyntaxException {
	Hashtable<String, LinkWritable> results = new Hashtable<String, LinkWritable>();
	HashSet<String> linkUrls = page.getLinkUrls();
	
	if (linkUrls != null && linkUrls.isEmpty() == false) {
		List<WikiArticle> articles = filterArticles(linkUrls, reporter);

		for (WikiArticle article : articles) {
		results.put(article.getKey(), new LinkWritable(article.getArticleName(),
														formatField(page.getTitle(), TITLE_LENGTH),
														page.getWarcDate(),
														page.getUrl()));
		}
	}
	
	return results;
}
 
源代码11 项目: hiped2   文件: OptimizedDataJoinMapperBase.java
public void map(Object key, Object value,
                OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }
  addLongValue("totalCount", 1);
  OutputValue aRecord = genMapOutputValue(value);
  if (aRecord == null) {
    addLongValue("discardedCount", 1);
    return;
  }
  aRecord.setSmaller(smaller);
  String groupKey = genGroupKey(key, aRecord);
  if (groupKey == null) {
    addLongValue("nullGroupKeyCount", 1);
    return;
  }
  outputKey.setKey(groupKey);
  output.collect(outputKey, aRecord);
  addLongValue("collectedCount", 1);
}
 
源代码12 项目: anthelion   文件: LinkDbMerger.java
public void reduce(Text key, Iterator<Inlinks> values, OutputCollector<Text, Inlinks> output, Reporter reporter) throws IOException {

    Inlinks result = new Inlinks();

    while (values.hasNext()) {
      Inlinks inlinks = values.next();

      int end = Math.min(maxInlinks - result.size(), inlinks.size());
      Iterator<Inlink> it = inlinks.iterator();
      int i = 0;
      while(it.hasNext() && i++ < end) {
        result.add(it.next());
      }
    }
    if (result.size() == 0) return;
    output.collect(key, result);
    
  }
 
源代码13 项目: anthelion   文件: NodeDumper.java
/**
 * Outputs the url with the appropriate number of inlinks, outlinks, or for
 * score.
 */
public void map(Text key, Node node,
  OutputCollector<FloatWritable, Text> output, Reporter reporter)
  throws IOException {

  float number = 0;
  if (inlinks) {
    number = node.getNumInlinks();
  }
  else if (outlinks) {
    number = node.getNumOutlinks();
  }
  else {
    number = node.getInlinkScore();
  }

  // number collected with negative to be descending
  output.collect(new FloatWritable(-number), key);
}
 
源代码14 项目: big-c   文件: TestDBInputFormat.java
/**
 * test DBInputFormat class. Class should split result for chunks
 * @throws Exception
 */
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
  JobConf configuration = new JobConf();
  setupDriver(configuration);
  
  DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
  format.setConf(configuration);
  format.setConf(configuration);
  DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
  Reporter reporter = mock(Reporter.class);
  RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
      splitter, configuration, reporter);

  configuration.setInt(MRJobConfig.NUM_MAPS, 3);
  InputSplit[] lSplits = format.getSplits(configuration, 3);
  assertEquals(5, lSplits[0].getLength());
  assertEquals(3, lSplits.length);

  // test reader .Some simple tests
  assertEquals(LongWritable.class, reader.createKey().getClass());
  assertEquals(0, reader.getPos());
  assertEquals(0, reader.getProgress(), 0.001);
  reader.close();
}
 
源代码15 项目: hadoop-gpu   文件: DataJoinMapperBase.java
public void map(Object key, Object value,
                OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }
  addLongValue("totalCount", 1);
  TaggedMapOutput aRecord = generateTaggedMapOutput(value);
  if (aRecord == null) {
    addLongValue("discardedCount", 1);
    return;
  }
  Text groupKey = generateGroupKey(aRecord);
  if (groupKey == null) {
    addLongValue("nullGroupKeyCount", 1);
    return;
  }
  output.collect(groupKey, aRecord);
  addLongValue("collectedCount", 1);
}
 
源代码16 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public Object call() 
	throws Exception 
{
	RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
	LongWritable key = new LongWritable();
	Text oneLine = new Text();

	try {
		// count rows from the first row
		while (reader.next(key, oneLine)) {
			_nrows++;
		}
	} 
	catch (Exception e) {
		_rc = false;
		_errMsg = "RecordReader error libsvm format. split: "+ _split.toString() + e.getMessage();
		throw new IOException(_errMsg);
	} 
	finally {
		IOUtilFunctions.closeSilently(reader);
	}

	return null;
}
 
源代码17 项目: ignite   文件: HadoopV1OutputCollector.java
/**
 * @param jobConf Job configuration.
 * @param taskCtx Task context.
 * @param directWrite Direct write flag.
 * @param fileName File name.
 * @throws IOException In case of IO exception.
 */
HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
    @Nullable String fileName, TaskAttemptID attempt) throws IOException {
    this.jobConf = jobConf;
    this.taskCtx = taskCtx;
    this.attempt = attempt;

    if (directWrite) {
        jobConf.set("mapreduce.task.attempt.id", attempt.toString());

        OutputFormat outFormat = jobConf.getOutputFormat();

        writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
    }
    else
        writer = null;
}
 
源代码18 项目: hadoop-book   文件: RandomWriter.java
/**
 * Given an output filename, write a bunch of random records to it.
 */
public void map(WritableComparable key,
        Writable value,
        OutputCollector<BytesWritable, BytesWritable> output,
        Reporter reporter) throws IOException {
    int itemCount = 0;
    while (numBytesToWrite > 0) {
        int keyLength = minKeySize
                + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
        randomKey.setSize(keyLength);
        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
        int valueLength = minValueSize
                + (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
        randomValue.setSize(valueLength);
        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
        output.collect(randomKey, randomValue);
        numBytesToWrite -= keyLength + valueLength;
        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
        if (++itemCount % 200 == 0) {
            reporter.setStatus("wrote record " + itemCount + ". "
                    + numBytesToWrite + " bytes left.");
        }
    }
    reporter.setStatus("done with " + itemCount + " records.");
}
 
源代码19 项目: RDFS   文件: CompositeInputFormat.java
/**
 * Construct a CompositeRecordReader for the children of this InputFormat
 * as defined in the init expression.
 * The outermost join need only be composable, not necessarily a composite.
 * Mandating TupleWritable isn't strictly correct.
 */
@SuppressWarnings("unchecked") // child types unknown
public ComposableRecordReader<K,TupleWritable> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {
  setFormat(job);
  return root.getRecordReader(split, job, reporter);
}
 
源代码20 项目: hadoop-solr   文件: RegexIngestMapper.java
@Override
public LWDocument[] toDocuments(Writable key, Writable value, Reporter reporter,
    Configuration conf) throws IOException {
  if (key != null && value != null) {
    LWDocument doc = createDocument(key.toString() + "-" + System.currentTimeMillis(), null);
    Matcher matcher = regex.matcher(value.toString());
    if (matcher != null) {
      if (match) {
        if (matcher.matches()) {
          processMatch(doc, matcher);
        }
      } else {//
        while (matcher.find()) {
          processMatch(doc, matcher);
          reporter.progress();//do we really even need this?
        }
      }
    }
    // Adding the file path where this record was taken
    FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
    String originalLogFilePath = fileSplit.getPath().toUri().getPath();
    doc.addField(FIELD_PATH, originalLogFilePath);
    String docId = originalLogFilePath + "-" + doc.getId();
    doc.setId(docId);
    return new LWDocument[] {doc};
  }
  return null;
}
 
源代码21 项目: hbase   文件: TestTableMapReduce.java
/**
 * Pass the key, and reversed value to reduce
 */
public void map(ImmutableBytesWritable key, Result value,
  OutputCollector<ImmutableBytesWritable, Put> output,
  Reporter reporter)
throws IOException {
  output.collect(key, TestTableMapReduceBase.map(key, value));
}
 
源代码22 项目: RDFS   文件: RegexMapper.java
public void map(K key, Text value,
                OutputCollector<Text, LongWritable> output,
                Reporter reporter)
  throws IOException {
  String text = value.toString();
  Matcher matcher = pattern.matcher(text);
  while (matcher.find()) {
    output.collect(new Text(matcher.group(group)), new LongWritable(1));
  }
}
 
源代码23 项目: tez   文件: TezGroupedSplitsInputFormat.java
public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
    Reporter reporter) throws IOException {
  this.groupedSplit = split;
  this.job = job;
  this.reporter = reporter;
  initNextRecordReader();
}
 
源代码24 项目: anthelion   文件: CrawlDbReader.java
public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter)
        throws IOException {
  output.collect(new Text("T"), COUNT_1);
  output.collect(new Text("status " + value.getStatus()), COUNT_1);
  output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
  output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0)));
  if(sort){
    URL u = new URL(key.toString());
    String host = u.getHost();
    output.collect(new Text("status " + value.getStatus() + " " + host), COUNT_1);
  }
}
 
源代码25 项目: hadoop-gpu   文件: DBInputFormat.java
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
    JobConf job, Reporter reporter) throws IOException {

  Class inputClass = dbConf.getInputClass();
  try {
    return new DBRecordReader((DBInputSplit) split, inputClass, job);
  }
  catch (SQLException ex) {
    throw new IOException(ex.getMessage());
  }
}
 
源代码26 项目: Flink-CEPplus   文件: HadoopReduceFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith("Comment")) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码27 项目: Flink-CEPplus   文件: HadoopMapFunctionITCase.java
@Override
public void map(final IntWritable k, final Text v,
		final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
	if (v.toString().contains("bananas")) {
		out.collect(k, v);
	}
}
 
源代码28 项目: Flink-CEPplus   文件: HadoopMapFunctionITCase.java
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
		throws IOException {
	if (v.toString().startsWith(filterPrefix)) {
		out.collect(k, v);
	}
}
 
源代码29 项目: hadoop   文件: DBInputFormat.java
/** {@inheritDoc} */
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
    JobConf job, Reporter reporter) throws IOException {

  // wrap the DBRR in a shim class to deal with API differences.
  return new DBRecordReaderWrapper<T>(
      (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>) 
      createDBRecordReader(
        (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
}
 
源代码30 项目: RDFS   文件: PipesReducer.java
/**
 * Handle the end of the input by closing down the application.
 */
public void close() throws IOException {
  // if we haven't started the application, we have nothing to do
  if (isOk) {
    OutputCollector<K3, V3> nullCollector = new OutputCollector<K3, V3>() {
      public void collect(K3 key, 
                          V3 value) throws IOException {
        // NULL
      }
    };
    startApplication(nullCollector, Reporter.NULL);
  }
  try {
    if (isOk) {
      application.getDownlink().endOfInput();
    } else {
      // send the abort to the application and let it clean up
      application.getDownlink().abort();
    }
    LOG.info("waiting for finish");
    application.waitForFinish();
    LOG.info("got done");
  } catch (Throwable t) {
    application.abort(t);
  } finally {
    application.cleanup();
  }
}
 
 类所在包
 同包方法