类org.apache.hadoop.mapred.OutputCollector源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.OutputCollector的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestDFSIO.java
@Override // IOMapperBase
void collectStats(OutputCollector<Text, Text> output, 
                  String name,
                  long execTime, 
                  Long objSize) throws IOException {
  long totalSize = objSize.longValue();
  float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
  LOG.info("Number of bytes processed = " + totalSize);
  LOG.info("Exec time = " + execTime);
  LOG.info("IO rate = " + ioRateMbSec);
  
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
      new Text(String.valueOf(1)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
      new Text(String.valueOf(totalSize)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
      new Text(String.valueOf(execTime)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
      new Text(String.valueOf(ioRateMbSec*1000)));
  output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
      new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
 
源代码2 项目: hadoop   文件: DataJoinReducerBase.java
public void reduce(Object key, Iterator values,
                   OutputCollector output, Reporter reporter) throws IOException {
  if (this.reporter == null) {
    this.reporter = reporter;
  }

  SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
  Object[] tags = groups.keySet().toArray();
  ResetableIterator[] groupValues = new ResetableIterator[tags.length];
  for (int i = 0; i < tags.length; i++) {
    groupValues[i] = groups.get(tags[i]);
  }
  joinAndCollect(tags, groupValues, key, output, reporter);
  addLongValue("groupCount", 1);
  for (int i = 0; i < tags.length; i++) {
    groupValues[i].close();
  }
}
 
源代码3 项目: hadoop   文件: PipesReducer.java
/**
 * Process all of the keys and values. Start up the application if we haven't
 * started it yet.
 */
public void reduce(K2 key, Iterator<V2> values, 
                   OutputCollector<K3, V3> output, Reporter reporter
                   ) throws IOException {
  isOk = false;
  startApplication(output, reporter);
  downlink.reduceKey(key);
  while (values.hasNext()) {
    downlink.reduceValue(values.next());
  }
  if(skipping) {
    //flush the streams on every record input if running in skip mode
    //so that we don't buffer other records surrounding a bad record.
    downlink.flush();
  }
  isOk = true;
}
 
源代码4 项目: hadoop   文件: PipesReducer.java
@SuppressWarnings("unchecked")
private void startApplication(OutputCollector<K3, V3> output, Reporter reporter) throws IOException {
  if (application == null) {
    try {
      LOG.info("starting application");
      application = 
        new Application<K2, V2, K3, V3>(
            job, null, output, reporter, 
            (Class<? extends K3>) job.getOutputKeyClass(), 
            (Class<? extends V3>) job.getOutputValueClass());
      downlink = application.getDownlink();
    } catch (InterruptedException ie) {
      throw new RuntimeException("interrupted", ie);
    }
    int reduce=0;
    downlink.runReduce(reduce, Submitter.getIsJavaRecordWriter(job));
  }
}
 
源代码5 项目: hadoop   文件: ValueAggregatorCombiner.java
/** Combines values for a given key.  
 * @param key the key is expected to be a Text object, whose prefix indicates
 * the type of aggregation to aggregate the values. 
 * @param values the values to combine
 * @param output to collect combined values
 */
public void reduce(Text key, Iterator<Text> values,
                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  String keyStr = key.toString();
  int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
  String type = keyStr.substring(0, pos);
  ValueAggregator aggregator = ValueAggregatorBaseDescriptor
    .generateValueAggregator(type);
  while (values.hasNext()) {
    aggregator.addNextValue(values.next());
  }
  Iterator outputs = aggregator.getCombinerOutput().iterator();

  while (outputs.hasNext()) {
    Object v = outputs.next();
    if (v instanceof Text) {
      output.collect(key, (Text)v);
    } else {
      output.collect(key, new Text(v.toString()));
    }
  }
}
 
源代码6 项目: hadoop   文件: HadoopArchives.java
public void reduce(IntWritable key, Iterator<Text> values,
    OutputCollector<Text, Text> out,
    Reporter reporter) throws IOException {
  keyVal = key.get();
  while(values.hasNext()) {
    Text value = values.next();
    String towrite = value.toString() + "\n";
    indexStream.write(towrite.getBytes(Charsets.UTF_8));
    written++;
    if (written > numIndexes -1) {
      // every 1000 indexes we report status
      reporter.setStatus("Creating index for archives");
      reporter.progress();
      endIndex = keyVal;
      String masterWrite = startIndex + " " + endIndex + " " + startPos 
                          +  " " + indexStream.getPos() + " \n" ;
      outStream.write(masterWrite.getBytes(Charsets.UTF_8));
      startPos = indexStream.getPos();
      startIndex = endIndex;
      written = 0;
    }
  }
}
 
@Override
public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
		throws IOException {
	// normalize and split the line
	String line = v.toString();
	String[] tokens = line.toLowerCase().split("\\W+");

	// emit the pairs
	for (String token : tokens) {
		if (token.length() > 0) {
			out.collect(new Text(token), new LongWritable(1L));
		}
	}
}
 
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
		throws IOException {

	long cnt = 0;
	while (vs.hasNext()) {
		cnt += vs.next().get();
	}
	out.collect(k, new LongWritable(cnt));

}
 
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith("Comment")) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码10 项目: hadoop   文件: TestStreamingOutputKeyValueTypes.java
public void reduce(K key, Iterator<V> values,
    OutputCollector<LongWritable, Text> output, Reporter reporter)
    throws IOException {
  LongWritable l = new LongWritable();
  while (values.hasNext()) {
    output.collect(l, new Text(values.next().toString()));
  }
}
 
源代码11 项目: Flink-CEPplus   文件: HadoopMapFunctionITCase.java
@Override
public void map(final IntWritable k, final Text v,
		final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
	if (v.toString().contains("bananas")) {
		out.collect(k, v);
	}
}
 
源代码12 项目: Flink-CEPplus   文件: HadoopMapFunctionITCase.java
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
		throws IOException {
	if (v.toString().startsWith(filterPrefix)) {
		out.collect(k, v);
	}
}
 
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {

	int sum = 0;
	while (v.hasNext()) {
		sum += v.next().get();
	}
	out.collect(k, new IntWritable(sum));
}
 
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	while (v.hasNext()) {
		out.collect(new IntWritable(k.get() % 4), v.next());
	}
}
 
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith(this.countPrefix)) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码16 项目: hadoop   文件: DistCpV1.java
/**
 * Skip copying this file if already exists at the destination.
 * Updates counters and copy status if skipping this file.
 * @return true    if copy of this file can be skipped
 */
private boolean skipCopyFile(FileStatus srcstat, Path absdst,
                        OutputCollector<WritableComparable<?>, Text> outc,
                        Reporter reporter) throws IOException {
  if (destFileSys.exists(absdst) && !overwrite
      && !needsUpdate(srcstat, destFileSys, absdst)) {
    outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
    ++skipcount;
    reporter.incrCounter(Counter.SKIP, 1);
    updateStatus(reporter);
    return true;
  }
  return false;
}
 
源代码17 项目: flink   文件: HadoopMapredCompatWordCount.java
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
		throws IOException {

	long cnt = 0;
	while (vs.hasNext()) {
		cnt += vs.next().get();
	}
	out.collect(k, new LongWritable(cnt));

}
 
源代码18 项目: flink   文件: HadoopReduceFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith("Comment")) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码19 项目: flink   文件: HadoopReduceFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith(this.countPrefix)) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码20 项目: flink   文件: HadoopMapFunctionITCase.java
@Override
public void map(final IntWritable k, final Text v,
		final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
	if (v.toString().contains("bananas")) {
		out.collect(k, v);
	}
}
 
源代码21 项目: flink   文件: HadoopMapFunctionITCase.java
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
		throws IOException {
	if (v.toString().startsWith(filterPrefix)) {
		out.collect(k, v);
	}
}
 
源代码22 项目: flink   文件: HadoopReduceCombineFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {

	int sum = 0;
	while (v.hasNext()) {
		sum += v.next().get();
	}
	out.collect(k, new IntWritable(sum));
}
 
源代码23 项目: flink   文件: HadoopReduceCombineFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	while (v.hasNext()) {
		out.collect(new IntWritable(k.get() % 4), v.next());
	}
}
 
源代码24 项目: flink   文件: HadoopReduceCombineFunctionITCase.java
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
		throws IOException {
	int commentCnt = 0;
	while (vs.hasNext()) {
		String v = vs.next().toString();
		if (v.startsWith(this.countPrefix)) {
			commentCnt++;
		}
	}
	out.collect(k, new IntWritable(commentCnt));
}
 
源代码25 项目: blog   文件: PersonVersion.java
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

    KPI kpi = KPI.filterPVs(value.toString());
    if (kpi.getValid() == 1 && kpi.getRemote_addr() != null) {
        word.set(kpi.getRemote_addr());
        output.collect(word, one);
    }
}
 
源代码26 项目: blog   文件: PersonVersion.java
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

    KPI kpi = KPI.filterPVs(value.toString());
    if (kpi.getValid() == 0 && kpi.getRemote_addr() != null) {
        word.set(kpi.getRemote_addr());
        output.collect(word, one);
    }
}
 
源代码27 项目: blog   文件: PersonVersion.java
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

    KPI kpi = KPI.filterPVs(value.toString());
    if (kpi.getValid() == 0 && kpi.getRemote_addr() != null) {
        word.set(kpi.getRemote_addr());
        output.collect(word, one);
    }
}
 
源代码28 项目: hadoop   文件: PipeMapRunner.java
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
       throws IOException {
  PipeMapper pipeMapper = (PipeMapper)getMapper();
  pipeMapper.startOutputThreads(output, reporter);
  super.run(input, output, reporter);
}
 
源代码29 项目: blog   文件: PersonVersion.java
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {

    if(value.toString() != null) {
        output.collect(word, one);
    }
}
 
源代码30 项目: blog   文件: PersonVersion.java
@Override
public void reduce(Text key, Iterator<IntWritable> values,
                   OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException {
    Integer sum = 0;
    while (values.hasNext()) {
        sum += values.next().get();
    }
    result.set(sum);
    output.collect(key, result);
}
 
 类所在包
 类方法
 同包方法