下面列出了org.apache.hadoop.mapred.Reporter#incrCounter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
public static void log(Reporter logReporter) {
for (Counter c : Counter.values()) {
LOG.info(c + " start (" + started[c.value] + "), end (" + ended[c.value] + "): " + profileTimes[c.value]);
if (logReporter != null) {
logReporter.incrCounter(c, profileTimes[c.value]);
}
}
long read = profileTypeTimes[Counter.Type.READ.ordinal()];
long write = profileTypeTimes[Counter.Type.WRITE.ordinal()];
if (logReporter != null) {
LOG.info("read time: " + read);
LOG.info("write time: " + write);
logReporter.incrCounter(ReadWriteCounter.READ_TIME, read);
logReporter.incrCounter(ReadWriteCounter.WRITE_TIME, write);
}
}
public void reduce(IntWritable key, Iterator<NullWritable> values,
OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
throws IOException {
List<String> counterNames = getCounterNames();
for (String counterName : counterNames) {
reporter.incrCounter("Counters from Reducers", counterName, 1);
}
try {
reporter.setStatus("Sleeping... (" +
(reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
Thread.sleep(reduceSleepDuration);
}
catch (InterruptedException ex) {
throw (IOException)new IOException(
"Interrupted while sleeping").initCause(ex);
}
count++;
}
public void map(IntWritable key, IntWritable value,
OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
throws IOException {
List<String> counterNames = getCounterNames();
for (String counterName : counterNames) {
reporter.incrCounter("Counters from Mappers", counterName, 1);
}
//it is expected that every map processes mapSleepCount number of records.
try {
reporter.setStatus("Sleeping... (" +
(mapSleepDuration * (mapSleepCount - count)) + ") ms left");
Thread.sleep(mapSleepDuration);
}
catch (InterruptedException ex) {
throw (IOException)new IOException(
"Interrupted while sleeping").initCause(ex);
}
++count;
// output reduceSleepCount * numReduce number of random values, so that
// each reducer will get reduceSleepCount number of keys.
int k = key.get();
for (int i = 0; i < value.get(); ++i) {
output.collect(new IntWritable(k + i), NullWritable.get());
}
}
@Override
protected LWDocument[] toDocuments(
Writable key,
Text text,
Reporter reporter,
Configuration configuration) throws IOException {
LWDocument[] docs = null;
try {
docs = toDocumentsImpl(key, text);
} catch (Exception exc) {
log.error("Failed to process XML " + key + " due to: " + exc, exc);
reporter.incrCounter("XMLIngestMapper", "BadDocs", 1);
}
if (docs != null && docs.length > 0) {
reporter.incrCounter("XMLIngestMapper", "DocsCreated", docs.length);
} else {
log.warn("No documents added in: " + key);
docs = new LWDocument[0];
}
return docs;
}
public boolean isWikiPage(String url, Reporter reporter) throws URISyntaxException {
boolean result = false;
if (url != null && (url.indexOf(WIKIPEDIA_DOMAIN) >= 0 ||
url.indexOf(WIKIMEDIA_DOMAIN) >= 0)) {
URI pageUri = new URI(url);
String pageHost = pageUri.getHost();
if (pageHost != null && pageHost.endsWith(WIKIPEDIA_DOMAIN)) {
LOG.info(url);
reporter.incrCounter(COUNTER_GROUP, SKIP_WIKIPEDIA_PAGE, 1);
result = true;
}
if (pageHost != null && pageHost.endsWith(WIKIMEDIA_DOMAIN)) {
LOG.info(url);
reporter.incrCounter(COUNTER_GROUP, SKIP_WIKIMEDIA_PAGE, 1);
result = true;
}
}
return result;
}
public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
{
//report parfor counters
if( deltaTasks>0 )
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
if( deltaIterations>0 )
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
JobConf job = ConfigurationManager.getCachedJobConf();
if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
{
//report cache statistics
reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
//reset cache statistics to prevent overlapping reporting
CacheStatistics.reset();
}
}
/**
* Increment number of files copied and bytes copied and then report status
*/
void updateCopyStatus(FileStatus srcstat, Reporter reporter) {
copycount++;
reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen());
reporter.incrCounter(Counter.COPY, 1);
updateStatus(reporter);
}
/**
* Copies single file to the path specified by tmpfile.
* @param srcstat src path and metadata
* @param tmpfile temporary file to which copy is to be done
* @param absdst actual destination path to which copy is to be done
* @param reporter
* @return Number of bytes copied
*/
private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
Reporter reporter) throws IOException {
long bytesCopied = 0L;
Path srcPath = srcstat.getPath();
// open src file
try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
// open tmp file
try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
LOG.info("Copying file " + srcPath + " of size " +
srcstat.getLen() + " bytes...");
// copy file
for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
out.write(buffer, 0, bytesRead);
bytesCopied += bytesRead;
reporter.setStatus(
String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+ absdst + " [ " +
TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
+ TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
+ " ]");
}
}
}
return bytesCopied;
}
private void writeOutAsDuplicate(CrawlDatum datum,
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
throws IOException {
datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
Text key = (Text) datum.getMetaData().remove(urlKey);
reporter.incrCounter("DeduplicationJobStatus",
"Documents marked as duplicate", 1);
output.collect(key, datum);
}
/** Map method. Copies one file from source file system to destination.
* @param key src len
* @param value FilePair (FileStatus src, Path dst)
* @param out Log of failed copies
* @param reporter
*/
public void map(LongWritable key,
FilePairComparable value,
OutputCollector<FilePairComparable, Text> out,
Reporter reporter) throws IOException {
final FileStatus srcstat = value.input;
final Path relativedst = new Path(value.output);
try {
copy(value, out, reporter);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
updateStatus(reporter);
final String sfailure = "FAIL " + relativedst + " : " +
StringUtils.stringifyException(e);
out.collect(value, new Text(sfailure));
LOG.info(sfailure);
try {
for (int i = 0; i < 3; ++i) {
try {
final Path tmp = new Path(attemptTmpRoot, relativedst);
if (destFileSys.delete(tmp, true))
break;
} catch (Throwable ex) {
// ignore, we are just cleaning up
LOG.debug("Ignoring cleanup exception", ex);
}
// update status, so we don't get timed out
updateStatus(reporter);
Thread.sleep(3 * 1000);
}
} catch (InterruptedException inte) {
throw (IOException)new IOException().initCause(inte);
}
} finally {
updateStatus(reporter);
}
}
public void map(ImmutableBytesWritable row, Result values,
OutputCollector<ImmutableBytesWritable, Result> output,
Reporter reporter)
throws IOException {
// Count every row containing data, whether it's in qualifiers or values
reporter.incrCounter(Counters.ROWS, 1);
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
// Generate the key/value
int noWordsKey = minWordsInKey +
(wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
int noWordsValue = minWordsInValue +
(wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
Text keyWords = generateSentence(noWordsKey);
Text valueWords = generateSentence(noWordsValue);
// Write the sentence
output.collect(keyWords, valueWords);
numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
// Update counters, progress etc.
reporter.incrCounter(Counters.BYTES_WRITTEN,
(keyWords.getLength()+valueWords.getLength()));
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
private void incrementRetryCounter(Reporter reporter, PrintCounter retryCounter) {
if (reporter != null) {
if (retryCounter != null) {
reporter.incrCounter(retryCounter.getGroup(), retryCounter.getName(), 1);
} else {
reporter.progress();
}
}
}
@Override
public void reduce(Text key, Iterator<LWDocumentWritable> values,
OutputCollector<Text, LWDocumentWritable> output, Reporter reporter) throws IOException {
count++;
reduce(key, values, output, reporter);
reporter.incrCounter("TestIngestReducer", "count", count);
}
public void map(Text key, LinkArrayWritable value, OutputCollector<Text, LinkArrayWritable> output, Reporter reporter)
throws IOException {
try {
output.collect(key, value);
reporter.incrCounter(COUNTER_GROUP, RECORDS_FETCHED, 1);
reporter.incrCounter(COUNTER_GROUP, RESULTS_COUNTED, value.get().length);
} catch (Exception e) {
reporter.incrCounter(COUNTER_GROUP, MAP_EXCEPTION, 1);
LOG.error(StringUtils.stringifyException(e));
}
}
public void reduce(Text key, Iterator<LinkArrayWritable> values, OutputCollector<Text, LinkArrayWritable> output, Reporter reporter) throws IOException {
try {
LinkArrayWritable value = new LinkArrayWritable();
Writable[] allValues = new Writable[0];
Writable[] combinedValues;
Writable[] nextValues;
while (values.hasNext()) {
nextValues = values.next().get();
combinedValues = new Writable[allValues.length + nextValues.length];
System.arraycopy(allValues, 0, combinedValues, 0, allValues.length);
System.arraycopy(nextValues, 0, combinedValues, allValues.length, nextValues.length);
allValues = combinedValues;
}
value.set(allValues);
output.collect(key, value);
reporter.incrCounter(COUNTER_GROUP, URLS_REDUCED, 1);
reporter.incrCounter(COUNTER_GROUP, RESULTS_COMBINED, allValues.length);
} catch (Exception e) {
reporter.incrCounter(COUNTER_GROUP, REDUCE_EXCEPTION, 1);
LOG.error(StringUtils.stringifyException(e));
}
}
public void run(RecordReader<LongWritable, WritableWarcRecord> input,
OutputCollector<Text, LinkArrayWritable> output, Reporter reporter)
throws IOException {
try {
WikiMetadata wikiMetadata = new WikiMetadata();
LongWritable key = input.createKey();
WritableWarcRecord value = input.createValue();
while (input.next(key, value)) {
map(key, value, output, reporter, wikiMetadata);
reporter.incrCounter(COUNTER_GROUP, RECORDS_FETCHED, 1);
}
} catch(InterruptedException ie) {
reporter.incrCounter(COUNTER_GROUP, MAPPER_INTERRUPTED, 1);
LOG.error(StringUtils.stringifyException(ie));
} catch(IOException io) {
reporter.incrCounter(COUNTER_GROUP, RUN_IO_EXCEPTION, 1);
LOG.error(StringUtils.stringifyException(io));
} catch(Exception e) {
reporter.incrCounter(COUNTER_GROUP, RUN_EXCEPTION, 1);
LOG.error(StringUtils.stringifyException(e));
} finally {
input.close();
}
}
/** Map method. Copies one file from source file system to destination.
* @param key src len
* @param value FilePair (FileStatus src, Path dst)
* @param out Log of failed copies
* @param reporter
*/
public void map(LongWritable key,
FilePair value,
OutputCollector<WritableComparable<?>, Text> out,
Reporter reporter) throws IOException {
final FileStatus srcstat = value.input;
final Path relativedst = new Path(value.output);
try {
copyWithRetries(srcstat, relativedst, out, reporter);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
updateStatus(reporter);
final String sfailure = "FAIL " + relativedst + " : " +
StringUtils.stringifyException(e);
out.collect(null, new Text(sfailure));
LOG.info(sfailure);
if (e instanceof FileNotFoundException) {
final String s = "Possible Cause for failure: Either the filesystem "
+ srcstat.getPath().getFileSystem(job)
+ " is not accessible or the file is deleted";
LOG.error(s);
out.collect(null, new Text(s));
}
try {
for (int i = 0; i < 3; ++i) {
try {
final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst);
if (destFileSys.delete(tmp, true))
break;
} catch (Throwable ex) {
// ignore, we are just cleaning up
LOG.debug("Ignoring cleanup exception", ex);
}
// update status, so we don't get timed out
updateStatus(reporter);
Thread.sleep(3 * 1000);
}
} catch (InterruptedException inte) {
throw (IOException)new IOException().initCause(inte);
}
} finally {
updateStatus(reporter);
}
}