下面列出了怎么用org.apache.hadoop.mapred.OutputCollector的API类实例代码及写法,或者点击链接到github查看源代码。
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Long objSize) throws IOException {
long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
new Text(String.valueOf(1)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
new Text(String.valueOf(totalSize)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
new Text(String.valueOf(execTime)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
new Text(String.valueOf(ioRateMbSec*1000)));
output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
public void reduce(Object key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
if (this.reporter == null) {
this.reporter = reporter;
}
SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
Object[] tags = groups.keySet().toArray();
ResetableIterator[] groupValues = new ResetableIterator[tags.length];
for (int i = 0; i < tags.length; i++) {
groupValues[i] = groups.get(tags[i]);
}
joinAndCollect(tags, groupValues, key, output, reporter);
addLongValue("groupCount", 1);
for (int i = 0; i < tags.length; i++) {
groupValues[i].close();
}
}
/**
* Process all of the keys and values. Start up the application if we haven't
* started it yet.
*/
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter
) throws IOException {
isOk = false;
startApplication(output, reporter);
downlink.reduceKey(key);
while (values.hasNext()) {
downlink.reduceValue(values.next());
}
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
isOk = true;
}
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
if (application == null) {
try {
LOG.info("starting application");
application =
new Application<K2, V2, K3, V3>(
job, null, output, reporter,
(Class<? extends K3>) job.getOutputKeyClass(),
(Class<? extends V3>) job.getOutputValueClass());
downlink = application.getDownlink();
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
int reduce=0;
downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
}
}
/** Combines values for a given key.
* @param key the key is expected to be a Text object, whose prefix indicates
* the type of aggregation to aggregate the values.
* @param values the values to combine
* @param output to collect combined values
*/
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String keyStr = key.toString();
int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
String type = keyStr.substring(0, pos);
ValueAggregator aggregator = ValueAggregatorBaseDescriptor
.generateValueAggregator(type);
while (values.hasNext()) {
aggregator.addNextValue(values.next());
}
Iterator outputs = aggregator.getCombinerOutput().iterator();
while (outputs.hasNext()) {
Object v = outputs.next();
if (v instanceof Text) {
output.collect(key, (Text)v);
} else {
output.collect(key, new Text(v.toString()));
}
}
}
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<Text, Text> out,
Reporter reporter) throws IOException {
keyVal = key.get();
while(values.hasNext()) {
Text value = values.next();
String towrite = value.toString() + "\n";
indexStream.write(towrite.getBytes(Charsets.UTF_8));
written++;
if (written > numIndexes -1) {
// every 1000 indexes we report status
reporter.setStatus("Creating index for archives");
reporter.progress();
endIndex = keyVal;
String masterWrite = startIndex + " " + endIndex + " " + startPos
+ " " + indexStream.getPos() + " \n" ;
outStream.write(masterWrite.getBytes(Charsets.UTF_8));
startPos = indexStream.getPos();
startIndex = endIndex;
written = 0;
}
}
}
@Override
public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
// normalize and split the line
String line = v.toString();
String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Text(token), new LongWritable(1L));
}
}
}
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
long cnt = 0;
while (vs.hasNext()) {
cnt += vs.next().get();
}
out.collect(k, new LongWritable(cnt));
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
public void reduce(K key, Iterator<V> values,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
LongWritable l = new LongWritable();
while (values.hasNext()) {
output.collect(l, new Text(values.next().toString()));
}
}
@Override
public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
if (v.toString().contains("bananas")) {
out.collect(k, v);
}
}
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
throws IOException {
if (v.toString().startsWith(filterPrefix)) {
out.collect(k, v);
}
}
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int sum = 0;
while (v.hasNext()) {
sum += v.next().get();
}
out.collect(k, new IntWritable(sum));
}
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
while (v.hasNext()) {
out.collect(new IntWritable(k.get() % 4), v.next());
}
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
/**
* Skip copying this file if already exists at the destination.
* Updates counters and copy status if skipping this file.
* @return true if copy of this file can be skipped
*/
private boolean skipCopyFile(FileStatus srcstat, Path absdst,
OutputCollector<WritableComparable<?>, Text> outc,
Reporter reporter) throws IOException {
if (destFileSys.exists(absdst) && !overwrite
&& !needsUpdate(srcstat, destFileSys, absdst)) {
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
++skipcount;
reporter.incrCounter(Counter.SKIP, 1);
updateStatus(reporter);
return true;
}
return false;
}
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
long cnt = 0;
while (vs.hasNext()) {
cnt += vs.next().get();
}
out.collect(k, new LongWritable(cnt));
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
if (v.toString().contains("bananas")) {
out.collect(k, v);
}
}
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
throws IOException {
if (v.toString().startsWith(filterPrefix)) {
out.collect(k, v);
}
}
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int sum = 0;
while (v.hasNext()) {
sum += v.next().get();
}
out.collect(k, new IntWritable(sum));
}
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
while (v.hasNext()) {
out.collect(new IntWritable(k.get() % 4), v.next());
}
}
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while (vs.hasNext()) {
String v = vs.next().toString();
if (v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
KPI kpi = KPI.filterPVs(value.toString());
if (kpi.getValid() == 1 && kpi.getRemote_addr() != null) {
word.set(kpi.getRemote_addr());
output.collect(word, one);
}
}
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
KPI kpi = KPI.filterPVs(value.toString());
if (kpi.getValid() == 0 && kpi.getRemote_addr() != null) {
word.set(kpi.getRemote_addr());
output.collect(word, one);
}
}
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
KPI kpi = KPI.filterPVs(value.toString());
if (kpi.getValid() == 0 && kpi.getRemote_addr() != null) {
word.set(kpi.getRemote_addr());
output.collect(word, one);
}
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
PipeMapper pipeMapper = (PipeMapper)getMapper();
pipeMapper.startOutputThreads(output, reporter);
super.run(input, output, reporter);
}
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if(value.toString() != null) {
output.collect(word, one);
}
}
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
Integer sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}