下面列出了怎么用org.apache.hadoop.mapred.Reporter的API类实例代码及写法,或者点击链接到github查看源代码。
public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
beginMark_ = checkJobGet(CONF_NS + "begin");
endMark_ = checkJobGet(CONF_NS + "end");
maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
synched_ = false;
slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
if (slowMatch_) {
beginPat_ = makePatternCDataOrMark(beginMark_);
endPat_ = makePatternCDataOrMark(endMark_);
}
init();
}
public String getBestFrame(String frameLine, String parseLine, Reporter reporter)
{
String result = null;
Set<String> set = mFrameMap.keySet();
double maxVal = -Double.MIN_VALUE;
for(String frame: set)
{
String[] toks = frameLine.split("\t");
String newFrameLine = frame+"\t"+toks[1]+"\t"+toks[2];
LogFormula formula = getNumeratorFormula(newFrameLine, parseLine, reporter);
double val = formula.evaluate(this).exponentiate();
if(val>maxVal)
{
maxVal = val;
result=""+frame;
}
if(reporter!=null)
reporter.setStatus("Considered "+frame+" for frameLine:"+frameLine);
System.out.println("Considered "+frame+" for frameLine:"+frameLine);
}
return result;
}
@Override // IOMapperBase
public Long doIO(Reporter reporter,
String name,
long totalSize // in bytes
) throws IOException {
OutputStream out = (OutputStream)this.stream;
// write to the file
long nrRemaining;
for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
out.write(buffer, 0, curSize);
reporter.setStatus("writing " + name + "@" +
(totalSize - nrRemaining) + "/" + totalSize
+ " ::host = " + hostName);
}
return Long.valueOf(totalSize);
}
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
if (application == null) {
try {
LOG.info("starting application");
application =
new Application<K2, V2, K3, V3>(
job, null, output, reporter,
(Class<? extends K3>) job.getOutputKeyClass(),
(Class<? extends V3>) job.getOutputValueClass());
downlink = application.getDownlink();
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
int reduce=0;
downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
}
}
@Override
public void close(Reporter reporter) throws IOException {
try {
LOG.warn("I was called : close");
processRowErrors(session.close());
shutdownClient();
} catch (Exception e) {
throw new IOException("Encountered an error while closing this task", e);
} finally {
if (reporter != null) {
// This is the only place where we have access to the context in the record writer,
// so set the counter here.
reporter.getCounter(Counters.ROWS_WITH_ERRORS).setValue(rowsWithErrors.get());
}
}
}
@Override
public void reduce(ByteWritable key, Iterator<Text> values,
OutputCollector<Text, ByteWritable> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
Text document = values.next();
writers.delete(document.toString());
totalDeleted++;
reporter.incrCounter("CleaningJobStatus", "Deleted documents",
1);
// if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
// LOG.info("CleaningJob: deleting " + numDeletes
// + " documents");
// // TODO updateRequest.process(solr);
// // TODO updateRequest = new UpdateRequest();
// writers.delete(key.toString());
// totalDeleted += numDeletes;
// numDeletes = 0;
// }
}
}
@Test
public void testCloseWithoutTaskCommit() throws Exception {
OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
when(outputCommitter.needsTaskCommit(any(TaskAttemptContext.class))).thenReturn(false);
DummyRecordWriter recordWriter = mock(DummyRecordWriter.class);
JobConf jobConf = mock(JobConf.class);
HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
outputFormat.recordWriter = recordWriter;
outputFormat.outputCommitter = outputCommitter;
outputFormat.close();
verify(recordWriter, times(1)).close(any(Reporter.class));
verify(outputCommitter, times(0)).commitTask(any(TaskAttemptContext.class));
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/** Combines values for a given key.
* @param key the key is expected to be a Text object, whose prefix indicates
* the type of aggregation to aggregate the values.
* @param values the values to combine
* @param output to collect combined values
*/
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
ValueAggregator aggregator = ValueAggregatorBaseDescriptor
.generateValueAggregator(type);
while (values.hasNext()) {
aggregator.addNextValue(values.next());
}
Iterator outputs = aggregator.getCombinerOutput().iterator();
while (outputs.hasNext()) {
Object v = outputs.next();
if (v instanceof Text) {
output.collect(key, (Text)v);
} else {
output.collect(key, new Text(v.toString()));
}
}
}
public Hashtable<String, LinkWritable> createResults(Page page, Reporter reporter)
throws IOException, JsonParseException, URISyntaxException {
Hashtable<String, LinkWritable> results = new Hashtable<String, LinkWritable>();
HashSet<String> linkUrls = page.getLinkUrls();
if (linkUrls != null && linkUrls.isEmpty() == false) {
List<WikiArticle> articles = filterArticles(linkUrls, reporter);
for (WikiArticle article : articles) {
results.put(article.getKey(), new LinkWritable(article.getArticleName(),
formatField(page.getTitle(), TITLE_LENGTH),
page.getWarcDate(),
page.getUrl()));
}
}
return results;
}
public void map(Object key, Object value,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
addLongValue("totalCount", 1);
OutputValue aRecord = genMapOutputValue(value);
if (aRecord == null) {
addLongValue("discardedCount", 1);
return;
}
aRecord.setSmaller(smaller);
String groupKey = genGroupKey(key, aRecord);
if (groupKey == null) {
addLongValue("nullGroupKeyCount", 1);
return;
}
outputKey.setKey(groupKey);
output.collect(outputKey, aRecord);
addLongValue("collectedCount", 1);
}
public void reduce(Text key, Iterator<Inlinks> values, OutputCollector<Text, Inlinks> output, Reporter reporter) throws IOException {
Inlinks result = new Inlinks();
while (values.hasNext()) {
Inlinks inlinks = values.next();
int end = Math.min(maxInlinks - result.size(), inlinks.size());
Iterator<Inlink> it = inlinks.iterator();
int i = 0;
while(it.hasNext() && i++ < end) {
result.add(it.next());
}
}
if (result.size() == 0) return;
output.collect(key, result);
}
/**
* Outputs the url with the appropriate number of inlinks, outlinks, or for
* score.
*/
public void map(Text key, Node node,
OutputCollector<FloatWritable, Text> output, Reporter reporter)
throws IOException {
float number = 0;
if (inlinks) {
number = node.getNumInlinks();
}
else if (outlinks) {
number = node.getNumOutlinks();
}
else {
number = node.getInlinkScore();
}
// number collected with negative to be descending
output.collect(new FloatWritable(-number), key);
}
/**
* test DBInputFormat class. Class should split result for chunks
* @throws Exception
*/
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
JobConf configuration = new JobConf();
setupDriver(configuration);
DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
format.setConf(configuration);
format.setConf(configuration);
DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
Reporter reporter = mock(Reporter.class);
RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
splitter, configuration, reporter);
configuration.setInt(MRJobConfig.NUM_MAPS, 3);
InputSplit[] lSplits = format.getSplits(configuration, 3);
assertEquals(5, lSplits[0].getLength());
assertEquals(3, lSplits.length);
// test reader .Some simple tests
assertEquals(LongWritable.class, reader.createKey().getClass());
assertEquals(0, reader.getPos());
assertEquals(0, reader.getProgress(), 0.001);
reader.close();
}
public void map(Object key, Object value,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
addLongValue("totalCount", 1);
TaggedMapOutput aRecord = generateTaggedMapOutput(value);
if (aRecord == null) {
addLongValue("discardedCount", 1);
return;
}
Text groupKey = generateGroupKey(aRecord);
if (groupKey == null) {
addLongValue("nullGroupKeyCount", 1);
return;
}
output.collect(groupKey, aRecord);
addLongValue("collectedCount", 1);
}
@Override
public Object call()
throws Exception
{
RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
LongWritable key = new LongWritable();
Text oneLine = new Text();
try {
// count rows from the first row
while (reader.next(key, oneLine)) {
_nrows++;
}
}
catch (Exception e) {
_rc = false;
_errMsg = "RecordReader error libsvm format. split: "+ _split.toString() + e.getMessage();
throw new IOException(_errMsg);
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return null;
}
/**
* @param jobConf Job configuration.
* @param taskCtx Task context.
* @param directWrite Direct write flag.
* @param fileName File name.
* @throws IOException In case of IO exception.
*/
HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
@Nullable String fileName, TaskAttemptID attempt) throws IOException {
this.jobConf = jobConf;
this.taskCtx = taskCtx;
this.attempt = attempt;
if (directWrite) {
jobConf.set("mapreduce.task.attempt.id", attempt.toString());
OutputFormat outFormat = jobConf.getOutputFormat();
writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
}
else
writer = null;
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize
+ (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize
+ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
output.collect(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". "
+ numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
/**
* Construct a CompositeRecordReader for the children of this InputFormat
* as defined in the init expression.
* The outermost join need only be composable, not necessarily a composite.
* Mandating TupleWritable isn't strictly correct.
*/
@SuppressWarnings("unchecked") // child types unknown
public ComposableRecordReader<K,TupleWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
setFormat(job);
return root.getRecordReader(split, job, reporter);
}
@Override
public LWDocument[] toDocuments(Writable key, Writable value, Reporter reporter,
Configuration conf) throws IOException {
if (key != null && value != null) {
LWDocument doc = createDocument(key.toString() + "-" + System.currentTimeMillis(), null);
Matcher matcher = regex.matcher(value.toString());
if (matcher != null) {
if (match) {
if (matcher.matches()) {
processMatch(doc, matcher);
}
} else {//
while (matcher.find()) {
processMatch(doc, matcher);
reporter.progress();//do we really even need this?
}
}
}
// Adding the file path where this record was taken
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String originalLogFilePath = fileSplit.getPath().toUri().getPath();
doc.addField(FIELD_PATH, originalLogFilePath);
String docId = originalLogFilePath + "-" + doc.getId();
doc.setId(docId);
return new LWDocument[] {doc};
}
return null;
}
/**
* Pass the key, and reversed value to reduce
*/
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable, Put> output,
Reporter reporter)
throws IOException {
output.collect(key, TestTableMapReduceBase.map(key, value));
}
public void map(K key, Text value,
OutputCollector<Text, LongWritable> output,
Reporter reporter)
throws IOException {
String text = value.toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(new Text(matcher.group(group)), new LongWritable(1));
}
}
public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
initNextRecordReader();
}
public void map(Text key, CrawlDatum value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
output.collect(new Text("T"), COUNT_1);
output.collect(new Text("status " + value.getStatus()), COUNT_1);
output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
output.collect(new Text("s"), new LongWritable((long) (value.getScore() * 1000.0)));
if(sort){
URL u = new URL(key.toString());
String host = u.getHost();
output.collect(new Text("status " + value.getStatus() + " " + host), COUNT_1);
}
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
Class inputClass = dbConf.getInputClass();
try {
return new DBRecordReader((DBInputSplit) split, inputClass, job);
}
catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
if (v.toString().contains("bananas")) {
out.collect(k, v);
}
}
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
throws IOException {
if (v.toString().startsWith(filterPrefix)) {
out.collect(k, v);
}
}
/** {@inheritDoc} */
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
// wrap the DBRR in a shim class to deal with API differences.
return new DBRecordReaderWrapper<T>(
(org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>)
createDBRecordReader(
(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
}
/**
* Handle the end of the input by closing down the application.
*/
public void close() throws IOException {
// if we haven't started the application, we have nothing to do
if (isOk) {
OutputCollector<K3, V3> nullCollector = new OutputCollector<K3, V3>() {
public void collect(K3 key,
V3 value) throws IOException {
// NULL
}
};
startApplication(nullCollector, Reporter.NULL);
}
try {
if (isOk) {
application.getDownlink().endOfInput();
} else {
// send the abort to the application and let it clean up
application.getDownlink().abort();
}
LOG.info("waiting for finish");
application.waitForFinish();
LOG.info("got done");
} catch (Throwable t) {
application.abort(t);
} finally {
application.cleanup();
}
}