下面列出了org.apache.hadoop.mapred.Reporter#setStatus ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
OutputStream out = (OutputStream)this.stream;
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(totalSize);
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
OutputStream out = (OutputStream)this.stream;
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(totalSize);
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize
+ (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize
+ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
output.collect(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". "
+ numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
/**
* Map file name and offset into statistical data.
* <p>
* The map task is to get the
* <tt>key</tt>, which contains the file name, and the
* <tt>value</tt>, which is the offset within the file.
*
* The parameters are passed to the abstract method
* {@link #doIO(Reporter,String,long)}, which performs the io operation,
* usually read or write data, and then
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
public void map(UTF8 key,
LongWritable value,
OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
Object statValue = doIO(reporter, name, longValue);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
reporter.setStatus("finished " + name + " ::host = " + hostName);
}
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
public void reduce(
Text key,
Iterator<Text> iter,
OutputCollector<Text, Text> oc,
Reporter reporter) throws IOException {
int tsSum = 0, erCnt = 0;
double erSum = 0.0;
while (iter.hasNext()) {
List<Text> vals = Library.splitLine(iter.next(), '');
try {
tsSum += Integer.valueOf(vals.get(0).toString());
erSum += Double.valueOf(vals.get(1).toString());
erCnt += Integer.valueOf(vals.get(2).toString());
} catch (NumberFormatException nfe) {
}
}
double erAvg = erSum / erCnt;
StringBuffer sb = new StringBuffer();
sb.append((new Integer(tsSum)).toString());
sb.append("");
sb.append((new Double(erAvg)).toString());
oc.collect(key, new Text(sb.toString()));
reporter.setStatus("OK");
}
public void reduce(
Text key,
Iterator<Text> iter,
OutputCollector<Text, Text> oc,
Reporter reporter) throws IOException {
// For each value, figure out which file it's from and store it
// accordingly.
List<String> first = new ArrayList<String>();
List<String> second = new ArrayList<String>();
while (iter.hasNext()) {
Text t = iter.next();
String value = t.toString();
if (value.charAt(0) == '1') first.add(value.substring(1));
else second.add(value.substring(1));
reporter.setStatus("OK");
}
reporter.setStatus("OK");
if (first.size() == 0 || second.size() == 0) return;
// Do the cross product, and calculate the sum
for (String s1 : first) {
for (String s2 : second) {
try {
oc.collect(null, new Text(key.toString() + "\t" + s1 + "\t" + key.toString()));
} catch (NumberFormatException nfe) {
}
}
}
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new LineRecordReader(job, (FileSplit) genericSplit);
}
public void map(UTF8 key, LongWritable value,
OutputCollector<UTF8, LongWritable> collector,
Reporter reporter)
throws IOException {
String name = key.toString();
long size = value.get();
long seed = Long.parseLong(name);
random.setSeed(seed);
reporter.setStatus("creating " + name);
// write to temp file initially to permit parallel execution
Path tempFile = new Path(DATA_DIR, name+suffix);
OutputStream out = fs.create(tempFile);
long written = 0;
try {
while (written < size) {
if (fastCheck) {
Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
} else {
random.nextBytes(buffer);
}
long remains = size - written;
int length = (remains<=buffer.length) ? (int)remains : buffer.length;
out.write(buffer, 0, length);
written += length;
reporter.setStatus("writing "+name+"@"+written+"/"+size);
}
} finally {
out.close();
}
// rename to final location
fs.rename(tempFile, new Path(DATA_DIR, name));
collector.collect(new UTF8("bytes"), new LongWritable(written));
reporter.setStatus("wrote " + name);
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
// Generate the key/value
int noWordsKey = minWordsInKey +
(wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
int noWordsValue = minWordsInValue +
(wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
Text keyWords = generateSentence(noWordsKey);
Text valueWords = generateSentence(noWordsValue);
// Write the sentence
output.collect(keyWords, valueWords);
numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
// Update counters, progress etc.
reporter.incrCounter(Counters.BYTES_WRITTEN,
(keyWords.getLength()+valueWords.getLength()));
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new LineRecordReader(job, (FileSplit) genericSplit);
}
/**
* Open operation
* @param name of the prefix of the putput file to be read
* @param reporter an instanse of (@link Reporter) to be used for
* status' updates
*/
private void doOpenReadOp(String name,
Reporter reporter) {
FSDataInputStream input;
byte[] buffer = new byte[bytesToWrite];
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
boolean successfulOp = false;
while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
try {
// Set up timer for measuring AL
startTimeAL = System.currentTimeMillis();
input = filesystem.open(filePath);
totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
// If the file needs to be read (specified at command line)
if (readFile) {
startTimeAL = System.currentTimeMillis();
input.readFully(buffer);
totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
}
input.close();
successfulOp = true;
successfulFileOps ++;
reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) {
LOG.info("Exception recorded in op: OpenRead " + e);
numOfExceptions++;
}
}
}
}
/**
* Rename operation
* @param name of prefix of the file to be renamed
* @param reporter an instanse of (@link Reporter) to be used for
* status' updates
*/
private void doRenameOp(String name,
Reporter reporter) {
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
Path filePathR = new Path(new Path(baseDir, dataDirName),
name + "_r_" + l);
boolean successfulOp = false;
while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
try {
// Set up timer for measuring AL
startTimeAL = System.currentTimeMillis();
filesystem.rename(filePath, filePathR);
totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
successfulOp = true;
successfulFileOps ++;
reporter.setStatus("Finish "+ l + " files");
} catch (IOException e) {
LOG.info("Exception recorded in op: Rename");
numOfExceptions++;
}
}
}
}
/**
* @param frame
* @param intTokNums
* @param data
* @param reporter
* @return
* @see LRIdentificationModelSingleNode#getFormulaForFrame(String, int[], String[][])
*/
protected LogFormula getFormulaForFrame(String frame, int[] intTokNums, String[][] data, Reporter reporter)
{
THashSet<String> hiddenUnits = mFrameMap.get(frame);
LogFormula result = getFormulaObject(LogFormula.Op.PLUS);
DependencyParse parse = DependencyParse.processFN(data, 0.0);
for (String unit : hiddenUnits)
{
FeatureExtractor featex = new FeatureExtractor();
IntCounter<String> valMap = featex.extractFeatures(frame, intTokNums, unit, data, mWNR, mTrainOrTest, mWnRelationsCache,null,parse); // last arg different from superclass method's call
Set<String> features = valMap.keySet();
LogFormula featSum = getFormulaObject(LogFormula.Op.PLUS);
for (String feat : features)
{
double val = valMap.getT(feat);
LogFormula prod = getFormulaObject(LogFormula.Op.TIMES);
LogFormula featVal = getFormulaObject(LDouble.convertToLogDomain(val));
prod.add_arg(featVal);
LogFormula paramFormula = getLazyLookupParam(feat, mTrainOrTest);
prod.add_arg(paramFormula);
featSum.add_arg(prod);
if(reporter!=null)
reporter.setStatus("Found feature:"+feat); // not in superclass method
}
LogFormula expFormula = getFormulaObject(LogFormula.Op.EXP);
expFormula.add_arg(featSum);
result.add_arg(expFormula);
}
return result;
}
/**
* Copies single file to the path specified by tmpfile.
* @param srcstat src path and metadata
* @param tmpfile temporary file to which copy is to be done
* @param absdst actual destination path to which copy is to be done
* @param reporter
* @return Number of bytes copied
*/
private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
Reporter reporter) throws IOException {
long bytesCopied = 0L;
Path srcPath = srcstat.getPath();
// open src file
try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
// open tmp file
try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
LOG.info("Copying file " + srcPath + " of size " +
srcstat.getLen() + " bytes...");
// copy file
for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
out.write(buffer, 0, bytesRead);
bytesCopied += bytesRead;
reporter.setStatus(
String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
+ absdst + " [ " +
TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
+ TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
+ " ]");
}
}
}
return bytesCopied;
}
/** Map method.
* @param offset samples starting from the (offset+1)th sample.
* @param size the number of samples for this map
* @param out output {ture->numInside, false->numOutside}
* @param reporter
*/
public void map(LongWritable offset,
LongWritable size,
OutputCollector<BooleanWritable, LongWritable> out,
Reporter reporter) throws IOException {
final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
long numInside = 0L;
long numOutside = 0L;
for(long i = 0; i < size.get(); ) {
//generate points in a unit square
final double[] point = haltonsequence.nextPoint();
//count points inside/outside of the inscribed circle of the square
final double x = point[0] - 0.5;
final double y = point[1] - 0.5;
if (x*x + y*y > 0.25) {
numOutside++;
} else {
numInside++;
}
//report status
i++;
if (i % 1000 == 0) {
reporter.setStatus("Generated " + i + " samples.");
}
}
//output map results
out.collect(new BooleanWritable(true), new LongWritable(numInside));
out.collect(new BooleanWritable(false), new LongWritable(numOutside));
}
/** Run a FileOperation */
public void map(Text key, PolicyInfo policy,
OutputCollector<WritableComparable, Text> out, Reporter reporter)
throws IOException {
this.reporter = reporter;
try {
Codec.initializeCodecs(jobconf);
LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
Path p = new Path(key.toString());
FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
st.clear();
RaidNode.doRaid(jobconf, policy, fs, st, reporter);
++succeedcount;
reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
reporter.incrCounter(Counter.META_SIZE, st.metaSize);
reporter.incrCounter(Counter.SAVING_SIZE,
st.processedSize - st.remainingSize - st.metaSize);
reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FILES_FAILED, 1);
String s = "FAIL: " + policy + ", " + key + " "
+ StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
public void map(WritableComparable key, LongWritable value,
OutputCollector<K, LongWritable> collector,
Reporter reporter)
throws IOException {
String name = key.toString();
long size = value.get();
long seed = Long.parseLong(name);
if (size == 0) return;
reporter.setStatus("opening " + name);
FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
try {
for (int i = 0; i < SEEKS_PER_FILE; i++) {
// generate a random position
long position = Math.abs(random.nextLong()) % size;
// seek file to that position
reporter.setStatus("seeking " + name);
in.seek(position);
byte b = in.readByte();
// check that byte matches
byte checkByte = 0;
// advance random state to that position
random.setSeed(seed);
for (int p = 0; p <= position; p+= check.length) {
reporter.setStatus("generating data for " + name);
if (fastCheck) {
checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
} else {
random.nextBytes(check);
checkByte = check[(int)(position % check.length)];
}
}
assertEquals(b, checkByte);
}
} finally {
in.close();
}
}
/**
* Logs to the given reporter and logs to the internal logger at info level
*
* @param r
* the reporter to set status on
* @param msg
* the message to log
*/
private void logAndSetStatus(Reporter r, String msg) {
r.setStatus(msg);
LOG.info(msg);
}