类org.apache.hadoop.mapreduce.Reducer源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.Reducer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rya   文件: CountPlan.java
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
    long sum = 0;
    for(final LongWritable count : counts) {
        sum += count.get();
    }

    final String indexType = prospect.getTripleValueType().getIndexType();

    // not sure if this is the best idea..
    if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
        final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));

        final String dataType = prospect.getDataType();
        final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
        final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
        m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);

        context.write(null, m);
    }
}
 
源代码2 项目: jumbune   文件: DataValidationReducer.java
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
	super.setup(context);
	maxViolationsInReport = context.getConfiguration().getInt(DataValidationConstants.DV_NUM_REPORT_VIOLATION, 1000);
	String dir = context.getConfiguration().get(SLAVE_FILE_LOC);
	dirPath = JobUtil.getAndReplaceHolders(dir);
	fileHandlerMap = new DVLRUCache(DataValidationConstants.TEN);
	
	offsetLinesMap = new TreeMap<>();
	
	ViolationPersistenceBean bean = new ViolationPersistenceBean();
	bean.setLineNum(Integer.MAX_VALUE);
	
	nullMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	dataTypeMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	regexMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	numFieldsMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport); 
	fileNames = new HashSet<String>();
}
 
源代码3 项目: kylin-on-parquet-v2   文件: MapReduceUtil.java
private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
    double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
    double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();

    // number of reduce tasks
    int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);

    // at least 1 reducer by default
    numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
    // no more than 500 reducer by default
    numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);

    logger.info("Having total map input MB " + Math.round(totalSizeInM));
    logger.info("Having per reduce MB " + perReduceInputMB);
    logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
    return numReduceTasks;
}
 
源代码4 项目: geowave   文件: UpdateCentroidCostMapReduce.java
@Override
public void reduce(
    final GroupIDText key,
    final Iterable<CountofDoubleWritable> values,
    final Reducer<GroupIDText, CountofDoubleWritable, GroupIDText, CountofDoubleWritable>.Context context)
    throws IOException, InterruptedException {

  double expectation = 0;
  double ptCount = 0;
  for (final CountofDoubleWritable value : values) {
    expectation += value.getValue();
    ptCount += value.getCount();
  }
  outputValue.set(expectation, ptCount);
  context.write(key, outputValue);
}
 
源代码5 项目: xxhadoop   文件: FlowPartitionReducer.java
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
		Reducer<Text, FlowBean, Text, FlowBean>.Context context)
		throws IOException, InterruptedException {

	//super.reduce(arg0, arg1, arg2);
	long upFlow = 0;
	long downFlow = 0;
	//long flowSum = 0;
	for (FlowBean flowBean : values) {
		upFlow += flowBean.getUpFlow();
		downFlow += flowBean.getDownFlow();
		//flowSum += flowBean.getSumFlow();
	}
	result.setPhoneNum(key.toString());
	result.setUpFlow(upFlow);
	result.setDownFlow(downFlow);
	//result.setSumFlow(flowSum);
	result.setSumFlow(upFlow + downFlow);
	context.write(key, result);
}
 
源代码6 项目: xxhadoop   文件: FlowSumReducer.java
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
		Reducer<Text, FlowBean, Text, FlowBean>.Context context)
		throws IOException, InterruptedException {

	//super.reduce(arg0, arg1, arg2);
	long upFlow = 0;
	long downFlow = 0;
	//long flowSum = 0;
	for (FlowBean flowBean : values) {
		upFlow += flowBean.getUpFlow();
		downFlow += flowBean.getDownFlow();
		//flowSum += flowBean.getSumFlow();
	}
	result.setPhoneNum(key.toString());
	result.setUpFlow(upFlow);
	result.setDownFlow(downFlow);
	//result.setSumFlow(flowSum);
	result.setSumFlow(upFlow + downFlow);
	context.write(key, result);
}
 
源代码7 项目: flink-perf   文件: KMeansDriver.java
public static void convertCentersSequenceFileToText (Configuration conf, FileSystem fs, String seqFilePath, String outputPath) throws Exception {

		Path seqFile = new Path (seqFilePath);
		Path output = new Path (outputPath);
		if (fs.exists(output)) {
			fs.delete(output, true);
		}
		Job job = Job.getInstance(conf);
		job.setMapperClass(CenterSequenceToTextConverter.class);
		job.setReducerClass(Reducer.class);
		job.setNumReduceTasks(0);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setInputFormatClass(SequenceFileInputFormat.class);
		FileInputFormat.addInputPath(job, seqFile);
		FileOutputFormat.setOutputPath(job, output);
		job.waitForCompletion(true);
	}
 
源代码8 项目: hadoop   文件: MapReduceTestUtil.java
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
 
源代码9 项目: hadoop   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码10 项目: geowave   文件: StoreCopyReducer.java
@Override
protected void reduceNativeValues(
    final GeoWaveInputKey key,
    final Iterable<Object> values,
    final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
    throws IOException, InterruptedException {
  final Iterator<Object> objects = values.iterator();
  while (objects.hasNext()) {
    final AdapterToIndexMapping mapping = store.getIndicesForAdapter(key.getInternalAdapterId());
    context.write(
        new GeoWaveOutputKey<>(
            internalAdapterStore.getTypeName(mapping.getAdapterId()),
            mapping.getIndexNames()),
        objects.next());
  }
}
 
源代码11 项目: big-c   文件: MapReduceTestUtil.java
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple kill job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createKillJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Kill-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(KillMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
 
源代码12 项目: big-c   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码13 项目: kylin   文件: MapReduceUtil.java
private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
    double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
    double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();

    // number of reduce tasks
    int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);

    // at least 1 reducer by default
    numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
    // no more than 500 reducer by default
    numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);

    logger.info("Having total map input MB " + Math.round(totalSizeInM));
    logger.info("Having per reduce MB " + perReduceInputMB);
    logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
    return numReduceTasks;
}
 
源代码14 项目: BigDataPlatform   文件: TransformBaseRunner.java
public void setupRunner(String jobName, Class<?> runnerClass,
    Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass,
    Class<? extends WritableComparable<?>> outputKeyClass,
    Class<? extends Writable> outputValueClass,
    Class<? extends OutputFormat<?, ?>> outputFormatClass) {
    this.setupRunner(jobName, runnerClass, mapperClass, reducerClass, outputKeyClass, outputValueClass, outputKeyClass, outputValueClass, outputFormatClass);

}
 
源代码15 项目: geowave   文件: RasterTileResizeReducer.java
@Override
protected void reduceNativeValues(
    final GeoWaveInputKey key,
    final Iterable<Object> values,
    final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, GridCoverage>.Context context)
    throws IOException, InterruptedException {
  final GridCoverage mergedCoverage = helper.getMergedCoverage(key, values);
  if (mergedCoverage != null) {
    context.write(helper.getGeoWaveOutputKey(), mergedCoverage);
  }
}
 
源代码16 项目: hbase   文件: Import.java
protected void reduce(
    CellWritableComparable row,
    Iterable<Cell> kvs,
    Reducer<CellWritableComparable,
      Cell, ImmutableBytesWritable, Cell>.Context context)
    throws java.io.IOException, InterruptedException {
  int index = 0;
  for (Cell kv : kvs) {
    context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
      new MapReduceExtendedCell(kv));
    if (++index % 100 == 0)
      context.setStatus("Wrote " + index + " KeyValues, "
          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
  }
}
 
源代码17 项目: BigDataArchitect   文件: TransformerBaseRunner.java
/**
 * 具体设置参数
 * 
 * @param jobName
 * @param runnerClass
 * @param mapperClass
 * @param reducerClass
 * @param mapOutputKeyClass
 * @param mapOutputValueClass
 * @param outputKeyClass
 * @param outputValueClass
 * @param outputFormatClass
 */
public void setupRunner(String jobName, Class<?> runnerClass, Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass, Class<? extends WritableComparable<?>> mapOutputKeyClass, Class<? extends Writable> mapOutputValueClass, Class<? extends WritableComparable<?>> outputKeyClass, Class<? extends Writable> outputValueClass,
        Class<? extends OutputFormat<?, ?>> outputFormatClass) {
    this.jobName = jobName;
    this.runnerClass = runnerClass;
    this.mapperClass = mapperClass;
    this.reducerClass = reducerClass;
    this.mapOutputKeyClass = mapOutputKeyClass;
    this.mapOutputValueClass = mapOutputValueClass;
    this.outputKeyClass = outputKeyClass;
    this.outputValueClass = outputValueClass;
    this.outputFormatClass = outputFormatClass;
    this.isCallSetUpRunnerMethod = true;
}
 
源代码18 项目: geowave   文件: DBScanMapReduce.java
@Override
protected void setup(
    final Reducer<PartitionDataWritable, AdapterWithObjectWritable, KEYOUT, VALUEOUT>.Context context)
    throws IOException, InterruptedException {
  super.setup(context);
  final ScopedJobConfiguration config =
      new ScopedJobConfiguration(context.getConfiguration(), NNMapReduce.class);

  // first run must at least form a triangle
  minOwners = config.getInt(ClusteringParameters.Clustering.MINIMUM_SIZE, 2);

  LOGGER.info("Minumum owners = {}", minOwners);
}
 
源代码19 项目: geowave   文件: NNMapReduce.java
@Override
protected void processSummary(
    final PartitionData partitionData,
    final Boolean summary,
    final org.apache.hadoop.mapreduce.Reducer.Context context) {
  // do nothing
}
 
源代码20 项目: incubator-iotdb   文件: TSFMRReadExample.java
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
    Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
    throws IOException, InterruptedException {

  double sum = 0;
  for (DoubleWritable value : values) {
    sum = sum + value.get();
  }
  context.write(key, new DoubleWritable(sum));
}
 
@Test
   public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
EthereumBlockReducer reducer = new EthereumBlockReducer();
final Text defaultKey = new Text("Transaction Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
	defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
   }
 
源代码22 项目: geowave   文件: StoreCopyReducer.java
@Override
protected void setup(
    final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
    throws IOException, InterruptedException {
  super.setup(context);
  store = GeoWaveOutputFormat.getJobContextAdapterIndexMappingStore(context);
  internalAdapterStore = GeoWaveOutputFormat.getJobContextInternalAdapterStore(context);
}
 
源代码23 项目: datawave   文件: MockReduceDriver.java
List<MRPair<KEYOUT,VALUEOUT>> run() throws IOException, InterruptedException {
    TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.REDUCE, 0, 0);
    final MockReduceContext context = new MockReduceContext(this.conf, id, BulkIngestKey.class, Value.class);
    context.reduceInput = this.input.iterator();
    
    Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context con = new WrappedReducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>().getReducerContext(context);
    this.reducer.run(con);
    return context.output;
}
 
源代码24 项目: xxhadoop   文件: WordCountReducer.java
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
		Reducer<Text, LongWritable, Text, LongWritable>.Context context)
		throws IOException, InterruptedException {

	//super.reduce(arg0, arg1, arg2);
	long count = 0;
	for (LongWritable value : values) {
		count += value.get();
	}
	result.set(count);
	context.write(key, result);
}
 
源代码25 项目: halvade   文件: GATKTools.java
public void setContext(Reducer.Context context) {
        this.context = context;
//        mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
        mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
        String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", ""); 
        if(customArgs != null)
            java.add(customArgs);
    }
 
源代码26 项目: hbase   文件: PutSortReducer.java
@Override
protected void
    setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
        throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  this.kvCreator = new CellCreator(conf);
}
 
源代码27 项目: hadoop   文件: TestLineRecordReaderJobs.java
/**
 * Creates and runs an MR job
 *
 * @param conf
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 */
public void createAndRunJob(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = Job.getInstance(conf);
  job.setJarByClass(TestLineRecordReaderJobs.class);
  job.setMapperClass(Mapper.class);
  job.setReducerClass(Reducer.class);
  FileInputFormat.addInputPath(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.waitForCompletion(true);
}
 
源代码28 项目: rya   文件: Upgrade322Tool.java
@Override
public int run(String[] strings) throws Exception {
    conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
    //faster
    init();

    Job job = new Job(conf);
    job.setJarByClass(Upgrade322Tool.class);

    setupAccumuloInput(job);
    AccumuloInputFormat.setInputTableName(job, MRUtils.getTablePrefix(conf) + TBL_OSP_SUFFIX);

    //we do not need to change any row that is a string, custom, or iri type
    IteratorSetting regex = new IteratorSetting(30, "regex",
                                                RegExFilter.class);
    RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false);
    RegExFilter.setNegate(regex, true);

    // set input output of the particular job
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Mutation.class);

    setupAccumuloOutput(job, MRUtils.getTablePrefix(conf) +
                           TBL_SPO_SUFFIX);

    // set mapper and reducer classes
    job.setMapperClass(Upgrade322Mapper.class);
    job.setReducerClass(Reducer.class);

    // Submit the job
    return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码29 项目: geowave   文件: GeoWaveWritableOutputReducer.java
@Override
protected void reduce(
    final KEYIN key,
    final Iterable<VALUEIN> values,
    final Reducer<KEYIN, VALUEIN, GeoWaveInputKey, ObjectWritable>.Context context)
    throws IOException, InterruptedException {
  reduceWritableValues(key, values, context);
}
 
源代码30 项目: geowave   文件: BulkIngestInputGenerationIT.java
@Override
public int run(final String[] args) throws Exception {

  final Configuration conf = getConf();
  conf.set("fs.defaultFS", "file:///");

  final Job job = Job.getInstance(conf, JOB_NAME);
  job.setJarByClass(getClass());

  FileInputFormat.setInputPaths(job, new Path(TEST_DATA_LOCATION));
  FileOutputFormat.setOutputPath(job, cleanPathForReuse(conf, OUTPUT_PATH));

  job.setMapperClass(SimpleFeatureToAccumuloKeyValueMapper.class);
  job.setReducerClass(Reducer.class); // (Identity Reducer)

  job.setInputFormatClass(GeonamesDataFileInputFormat.class);
  job.setOutputFormatClass(AccumuloFileOutputFormat.class);

  job.setMapOutputKeyClass(Key.class);
  job.setMapOutputValueClass(Value.class);
  job.setOutputKeyClass(Key.class);
  job.setOutputValueClass(Value.class);

  job.setNumReduceTasks(1);
  job.setSpeculativeExecution(false);

  final boolean result = job.waitForCompletion(true);

  mapInputRecords =
      job.getCounters().findCounter(TASK_COUNTER_GROUP_NAME, MAP_INPUT_RECORDS).getValue();

  mapOutputRecords =
      job.getCounters().findCounter(TASK_COUNTER_GROUP_NAME, MAP_OUTPUT_RECORDS).getValue();

  return result ? 0 : 1;
}
 
 类方法
 同包方法