org.apache.hadoop.mapred.Reporter#incrCounter ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.Reporter#incrCounter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: DistCh.java
/** Run a FileOperation */
public void map(Text key, FileOperation value,
    OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
    ) throws IOException {
  try {
    value.run(jobconf);
    ++succeedcount;
    reporter.incrCounter(Counter.SUCCEED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);

    String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
 
源代码2 项目: RDFS   文件: DistCh.java
/** Run a FileOperation */
public void map(Text key, FileOperation value,
    OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
    ) throws IOException {
  try {
    value.run(jobconf);
    ++succeedcount;
    reporter.incrCounter(Counter.SUCCEED, 1);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);

    String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
    out.collect(null, new Text(s));
    LOG.info(s);
  } finally {
    reporter.setStatus(getCountString());
  }
}
 
源代码3 项目: hive-dwrf   文件: ReaderWriterProfiler.java
public static void log(Reporter logReporter) {
  for (Counter c : Counter.values()) {
    LOG.info(c + " start (" + started[c.value] + "), end (" + ended[c.value] + "): " +  profileTimes[c.value]);
    if (logReporter != null) {
      logReporter.incrCounter(c, profileTimes[c.value]);
    }
  }

  long read = profileTypeTimes[Counter.Type.READ.ordinal()];
  long write = profileTypeTimes[Counter.Type.WRITE.ordinal()];
  if (logReporter != null) {
    LOG.info("read time: " + read);
    LOG.info("write time: " + write);
    logReporter.incrCounter(ReadWriteCounter.READ_TIME, read);
    logReporter.incrCounter(ReadWriteCounter.WRITE_TIME, write);
  }
}
 
源代码4 项目: RDFS   文件: SleepJob.java
public void reduce(IntWritable key, Iterator<NullWritable> values,
    OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
    throws IOException {
  List<String> counterNames = getCounterNames();
  for (String counterName : counterNames) {
    reporter.incrCounter("Counters from Reducers", counterName, 1);
  }
  try {
    reporter.setStatus("Sleeping... (" +
        (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
      Thread.sleep(reduceSleepDuration);
    
  }
  catch (InterruptedException ex) {
    throw (IOException)new IOException(
        "Interrupted while sleeping").initCause(ex);
  }
  count++;
}
 
源代码5 项目: RDFS   文件: SleepJob.java
public void map(IntWritable key, IntWritable value,
    OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
    throws IOException {

  List<String> counterNames = getCounterNames();
  for (String counterName : counterNames) {
    reporter.incrCounter("Counters from Mappers", counterName, 1);
  }
  //it is expected that every map processes mapSleepCount number of records. 
  try {
    reporter.setStatus("Sleeping... (" +
        (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
    Thread.sleep(mapSleepDuration);
  }
  catch (InterruptedException ex) {
    throw (IOException)new IOException(
        "Interrupted while sleeping").initCause(ex);
  }
  ++count;
  // output reduceSleepCount * numReduce number of random values, so that
  // each reducer will get reduceSleepCount number of keys.
  int k = key.get();
  for (int i = 0; i < value.get(); ++i) {
    output.collect(new IntWritable(k + i), NullWritable.get());
  }
}
 
源代码6 项目: hadoop-solr   文件: XMLIngestMapper.java
@Override
protected LWDocument[] toDocuments(
  Writable key,
  Text text,
  Reporter reporter,
  Configuration configuration) throws IOException {
  LWDocument[] docs = null;
  try {
    docs = toDocumentsImpl(key, text);
  } catch (Exception exc) {
    log.error("Failed to process XML " + key + " due to: " + exc, exc);
    reporter.incrCounter("XMLIngestMapper", "BadDocs", 1);
  }

  if (docs != null && docs.length > 0) {
    reporter.incrCounter("XMLIngestMapper", "DocsCreated", docs.length);
  } else {
    log.warn("No documents added in: " + key);
    docs = new LWDocument[0];
  }
  return docs;
}
 
源代码7 项目: wikireverse   文件: WikiMetadata.java
public boolean isWikiPage(String url, Reporter reporter) throws URISyntaxException {
	boolean result = false;
	
	if (url != null && (url.indexOf(WIKIPEDIA_DOMAIN) >= 0 ||
			url.indexOf(WIKIMEDIA_DOMAIN) >= 0)) {
		URI pageUri = new URI(url);
		String pageHost = pageUri.getHost();
		
		if (pageHost != null && pageHost.endsWith(WIKIPEDIA_DOMAIN)) {
			LOG.info(url);
			reporter.incrCounter(COUNTER_GROUP, SKIP_WIKIPEDIA_PAGE, 1);
			result = true;
		}

		if (pageHost != null && pageHost.endsWith(WIKIMEDIA_DOMAIN)) {
			LOG.info(url);
			reporter.incrCounter(COUNTER_GROUP, SKIP_WIKIMEDIA_PAGE, 1);
			result = true;
		}
	}
	
	return result;
}
 
源代码8 项目: systemds   文件: RemoteParForUtils.java
public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
{
	//report parfor counters
	if( deltaTasks>0 )
		reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
	if( deltaIterations>0 )
		reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
	
	JobConf job = ConfigurationManager.getCachedJobConf();
	if( DMLScript.STATISTICS  && !InfrastructureAnalyzer.isLocalMode(job) ) 
	{
		//report cache statistics
		reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
		reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
		reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
		reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
	
		//reset cache statistics to prevent overlapping reporting
		CacheStatistics.reset();
	}
}
 
源代码9 项目: hadoop   文件: DistCpV1.java
/**
 * Increment number of files copied and bytes copied and then report status
 */
void updateCopyStatus(FileStatus srcstat, Reporter reporter) {
  copycount++;
  reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen());
  reporter.incrCounter(Counter.COPY, 1);
  updateStatus(reporter);
}
 
源代码10 项目: hadoop   文件: DistCpV1.java
/**
 * Copies single file to the path specified by tmpfile.
 * @param srcstat  src path and metadata
 * @param tmpfile  temporary file to which copy is to be done
 * @param absdst   actual destination path to which copy is to be done
 * @param reporter
 * @return Number of bytes copied
 */
private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
                        Reporter reporter) throws IOException {
  long bytesCopied = 0L;
  Path srcPath = srcstat.getPath();
  // open src file
  try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
    reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
    // open tmp file
    try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
      LOG.info("Copying file " + srcPath + " of size " +
               srcstat.getLen() + " bytes...");
    
      // copy file
      for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
        out.write(buffer, 0, bytesRead);
        bytesCopied += bytesRead;
        reporter.setStatus(
            String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
            + absdst + " [ " +
            TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
            + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
            + " ]");
      }
    }
  }
  return bytesCopied;
}
 
源代码11 项目: nutch-htmlunit   文件: DeduplicationJob.java
private void writeOutAsDuplicate(CrawlDatum datum,
        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
        throws IOException {
    datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
    Text key = (Text) datum.getMetaData().remove(urlKey);
    reporter.incrCounter("DeduplicationJobStatus",
            "Documents marked as duplicate", 1);
    output.collect(key, datum);
}
 
源代码12 项目: RDFS   文件: DistCp.java
/** Map method. Copies one file from source file system to destination.
 * @param key src len
 * @param value FilePair (FileStatus src, Path dst)
 * @param out Log of failed copies
 * @param reporter
 */
public void map(LongWritable key,
                FilePairComparable value,
                OutputCollector<FilePairComparable, Text> out,
                Reporter reporter) throws IOException {
  final FileStatus srcstat = value.input;
  final Path relativedst = new Path(value.output);
  try {
    copy(value, out, reporter);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);
    updateStatus(reporter);
    final String sfailure = "FAIL " + relativedst + " : " +
                      StringUtils.stringifyException(e);
    out.collect(value, new Text(sfailure));
    LOG.info(sfailure);
    try {
      for (int i = 0; i < 3; ++i) {
        try {
          final Path tmp = new Path(attemptTmpRoot, relativedst);
          if (destFileSys.delete(tmp, true))
            break;
        } catch (Throwable ex) {
          // ignore, we are just cleaning up
          LOG.debug("Ignoring cleanup exception", ex);
        }
        // update status, so we don't get timed out
        updateStatus(reporter);
        Thread.sleep(3 * 1000);
      }
    } catch (InterruptedException inte) {
      throw (IOException)new IOException().initCause(inte);
    }
  } finally {
    updateStatus(reporter);
  }
}
 
源代码13 项目: hbase   文件: RowCounter.java
public void map(ImmutableBytesWritable row, Result values,
    OutputCollector<ImmutableBytesWritable, Result> output,
    Reporter reporter)
throws IOException {
    // Count every row containing data, whether it's in qualifiers or values
    reporter.incrCounter(Counters.ROWS, 1);
}
 
源代码14 项目: hadoop-gpu   文件: RandomTextWriter.java
/**
 * Given an output filename, write a bunch of random records to it.
 */
public void map(Text key, Text value,
                OutputCollector<Text, Text> output, 
                Reporter reporter) throws IOException {
  int itemCount = 0;
  while (numBytesToWrite > 0) {
    // Generate the key/value 
    int noWordsKey = minWordsInKey + 
      (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
    int noWordsValue = minWordsInValue + 
      (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
    Text keyWords = generateSentence(noWordsKey);
    Text valueWords = generateSentence(noWordsValue);
    
    // Write the sentence 
    output.collect(keyWords, valueWords);
    
    numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
    
    // Update counters, progress etc.
    reporter.incrCounter(Counters.BYTES_WRITTEN, 
                         (keyWords.getLength()+valueWords.getLength()));
    reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
    if (++itemCount % 200 == 0) {
      reporter.setStatus("wrote record " + itemCount + ". " + 
                         numBytesToWrite + " bytes left.");
    }
  }
  reporter.setStatus("done with " + itemCount + " records.");
}
 
private void incrementRetryCounter(Reporter reporter, PrintCounter retryCounter) {
  if (reporter != null) {
    if (retryCounter != null) {
      reporter.incrCounter(retryCounter.getGroup(), retryCounter.getName(), 1);
    } else {
      reporter.progress();
    }
  }
}
 
源代码16 项目: hadoop-solr   文件: TestIngestReducer.java
@Override
public void reduce(Text key, Iterator<LWDocumentWritable> values,
    OutputCollector<Text, LWDocumentWritable> output, Reporter reporter) throws IOException {
  count++;
  reduce(key, values, output, reporter);
  reporter.incrCounter("TestIngestReducer", "count", count);
}
 
源代码17 项目: wikireverse   文件: SegmentCombinerMapper.java
public void map(Text key, LinkArrayWritable value, OutputCollector<Text, LinkArrayWritable> output, Reporter reporter)
        throws IOException {

	try {
		output.collect(key, value);

		reporter.incrCounter(COUNTER_GROUP, RECORDS_FETCHED, 1);
		reporter.incrCounter(COUNTER_GROUP, RESULTS_COUNTED, value.get().length);
		
	} catch (Exception e) {
		reporter.incrCounter(COUNTER_GROUP, MAP_EXCEPTION, 1);
		LOG.error(StringUtils.stringifyException(e));
	}
}
 
源代码18 项目: wikireverse   文件: LinkArrayReducer.java
public void reduce(Text key, Iterator<LinkArrayWritable> values, OutputCollector<Text, LinkArrayWritable> output, Reporter reporter) throws IOException {
	try {
		LinkArrayWritable value = new LinkArrayWritable();
		Writable[] allValues = new Writable[0];
		Writable[] combinedValues;
		Writable[] nextValues;
		
		while (values.hasNext()) {
			nextValues = values.next().get();
			combinedValues = new Writable[allValues.length + nextValues.length];
			
			System.arraycopy(allValues, 0, combinedValues, 0, allValues.length);
			System.arraycopy(nextValues, 0, combinedValues, allValues.length, nextValues.length);
			
			allValues = combinedValues;
		}
		
		value.set(allValues);
		output.collect(key, value);
		
		reporter.incrCounter(COUNTER_GROUP, URLS_REDUCED, 1);
		reporter.incrCounter(COUNTER_GROUP, RESULTS_COMBINED, allValues.length);		

	} catch (Exception e) {
		reporter.incrCounter(COUNTER_GROUP, REDUCE_EXCEPTION, 1);
		LOG.error(StringUtils.stringifyException(e));
	}
}
 
源代码19 项目: wikireverse   文件: WikiReverseMapper.java
public void run(RecordReader<LongWritable, WritableWarcRecord> input,
				OutputCollector<Text, LinkArrayWritable> output, Reporter reporter)
				throws IOException {
	try {
		WikiMetadata wikiMetadata = new WikiMetadata();
		
		LongWritable key = input.createKey();
		WritableWarcRecord value = input.createValue();
		
		while (input.next(key, value)) {
			map(key, value, output, reporter, wikiMetadata);
			reporter.incrCounter(COUNTER_GROUP, RECORDS_FETCHED, 1);
		}
		
	} catch(InterruptedException ie) {
		reporter.incrCounter(COUNTER_GROUP, MAPPER_INTERRUPTED, 1);
		LOG.error(StringUtils.stringifyException(ie));
	} catch(IOException io) {
		reporter.incrCounter(COUNTER_GROUP, RUN_IO_EXCEPTION, 1);
		LOG.error(StringUtils.stringifyException(io));
	} catch(Exception e) {
		reporter.incrCounter(COUNTER_GROUP, RUN_EXCEPTION, 1);
		LOG.error(StringUtils.stringifyException(e));
	} finally {
		input.close();
	}
}
 
源代码20 项目: hadoop   文件: DistCpV1.java
/** Map method. Copies one file from source file system to destination.
 * @param key src len
 * @param value FilePair (FileStatus src, Path dst)
 * @param out Log of failed copies
 * @param reporter
 */
public void map(LongWritable key,
                FilePair value,
                OutputCollector<WritableComparable<?>, Text> out,
                Reporter reporter) throws IOException {
  final FileStatus srcstat = value.input;
  final Path relativedst = new Path(value.output);
  try {
    copyWithRetries(srcstat, relativedst, out, reporter);
  } catch (IOException e) {
    ++failcount;
    reporter.incrCounter(Counter.FAIL, 1);
    updateStatus(reporter);
    final String sfailure = "FAIL " + relativedst + " : " +
                      StringUtils.stringifyException(e);
    out.collect(null, new Text(sfailure));
    LOG.info(sfailure);
    if (e instanceof FileNotFoundException) {
      final String s = "Possible Cause for failure: Either the filesystem "
                       + srcstat.getPath().getFileSystem(job)
                       + " is not accessible or the file is deleted";
      LOG.error(s);
      out.collect(null, new Text(s));
    }

    try {
      for (int i = 0; i < 3; ++i) {
        try {
          final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst);
          if (destFileSys.delete(tmp, true))
            break;
        } catch (Throwable ex) {
          // ignore, we are just cleaning up
          LOG.debug("Ignoring cleanup exception", ex);
        }
        // update status, so we don't get timed out
        updateStatus(reporter);
        Thread.sleep(3 * 1000);
      }
    } catch (InterruptedException inte) {
      throw (IOException)new IOException().initCause(inte);
    }
  } finally {
    updateStatus(reporter);
  }
}