类org.apache.hadoop.mapred.Mapper源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.Mapper的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: blog   文件: PersonVersion.java
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
                             Class<? extends Reducer> reduceClass) throws Exception {
    JobConf conf = new JobConf(PersonVersion.class);
    conf.setJobName(jobName);

    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(IntWritable.class);

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(mapClass);
    conf.setCombinerClass(reduceClass);
    conf.setReducerClass(reduceClass);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, inputDir);
    FileOutputFormat.setOutputPath(conf, new Path(outputDir));

    JobClient.runJob(conf);
}
 
源代码2 项目: hadoop   文件: TestDFSIO.java
private void runIOTest(
        Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
        Path outputDir) throws IOException {
  JobConf job = new JobConf(config, TestDFSIO.class);

  FileInputFormat.setInputPaths(job, getControlDir(config));
  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(mapperClass);
  job.setReducerClass(AccumulatingReducer.class);

  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(1);
  JobClient.runJob(job);
}
 
源代码3 项目: hadoop   文件: TestMultipleInputs.java
public void testAddInputPathWithMapper() {
  final JobConf conf = new JobConf();
  MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
     MapClass.class);
  MultipleInputs.addInputPath(conf, new Path("/bar"),
     KeyValueTextInputFormat.class, MapClass2.class);
  final Map<Path, InputFormat> inputs = MultipleInputs
     .getInputFormatMap(conf);
  final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
     .getMapperTypeMap(conf);

  assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
  assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
     .getClass());
  assertEquals(MapClass.class, maps.get(new Path("/foo")));
  assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
 
源代码4 项目: big-c   文件: TestDFSIO.java
private void runIOTest(
        Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
        Path outputDir) throws IOException {
  JobConf job = new JobConf(config, TestDFSIO.class);

  FileInputFormat.setInputPaths(job, getControlDir(config));
  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(mapperClass);
  job.setReducerClass(AccumulatingReducer.class);

  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(1);
  JobClient.runJob(job);
}
 
源代码5 项目: big-c   文件: TestMultipleInputs.java
public void testAddInputPathWithMapper() {
  final JobConf conf = new JobConf();
  MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
     MapClass.class);
  MultipleInputs.addInputPath(conf, new Path("/bar"),
     KeyValueTextInputFormat.class, MapClass2.class);
  final Map<Path, InputFormat> inputs = MultipleInputs
     .getInputFormatMap(conf);
  final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
     .getMapperTypeMap(conf);

  assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
  assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
     .getClass());
  assertEquals(MapClass.class, maps.get(new Path("/foo")));
  assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
 
源代码6 项目: RDFS   文件: TestMultipleInputs.java
public void testAddInputPathWithMapper() {
  final JobConf conf = new JobConf();
  MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
     MapClass.class);
  MultipleInputs.addInputPath(conf, new Path("/bar"),
     KeyValueTextInputFormat.class, MapClass2.class);
  final Map<Path, InputFormat> inputs = MultipleInputs
     .getInputFormatMap(conf);
  final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
     .getMapperTypeMap(conf);

  assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
  assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
     .getClass());
  assertEquals(MapClass.class, maps.get(new Path("/foo")));
  assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
 
源代码7 项目: RDFS   文件: PipeReducer.java
public void configure(JobConf job) {
    super.configure(job);
    Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
    if(c != null) {
        postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
        LOG.info("PostHook="+c.getName());
    }

    c = job.getClass("stream.reduce.prehook", null, Reducer.class);
    if(c != null) {
        preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
        oc = new InmemBufferingOutputCollector();
        LOG.info("PreHook="+c.getName());
    }
    this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
 
源代码8 项目: hadoop-gpu   文件: TestMultipleInputs.java
public void testAddInputPathWithMapper() {
  final JobConf conf = new JobConf();
  MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
     MapClass.class);
  MultipleInputs.addInputPath(conf, new Path("/bar"),
     KeyValueTextInputFormat.class, MapClass2.class);
  final Map<Path, InputFormat> inputs = MultipleInputs
     .getInputFormatMap(conf);
  final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
     .getMapperTypeMap(conf);

  assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
  assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
     .getClass());
  assertEquals(MapClass.class, maps.get(new Path("/foo")));
  assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
 
源代码9 项目: Flink-CEPplus   文件: HadoopMapFunction.java
/**
 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
 * The Hadoop Mapper is configured with the provided JobConf.
 *
 * @param hadoopMapper The Hadoop Mapper to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Mapper.
 */
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
	if (hadoopMapper == null) {
		throw new NullPointerException("Mapper may not be null.");
	}
	if (conf == null) {
		throw new NullPointerException("JobConf may not be null.");
	}

	this.mapper = hadoopMapper;
	this.jobConf = conf;
}
 
源代码10 项目: Flink-CEPplus   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
	Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
	Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);

	final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
	final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
	return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
 
源代码11 项目: Flink-CEPplus   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
	Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
			(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
	mapper = InstantiationUtil.instantiate(mapperClass);

	jobConf = new JobConf();
	jobConf.readFields(in);
}
 
源代码12 项目: flink   文件: HadoopMapFunction.java
/**
 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
 * The Hadoop Mapper is configured with the provided JobConf.
 *
 * @param hadoopMapper The Hadoop Mapper to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Mapper.
 */
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
	if (hadoopMapper == null) {
		throw new NullPointerException("Mapper may not be null.");
	}
	if (conf == null) {
		throw new NullPointerException("JobConf may not be null.");
	}

	this.mapper = hadoopMapper;
	this.jobConf = conf;
}
 
源代码13 项目: flink   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
	Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
	Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);

	final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
	final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
	return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
 
源代码14 项目: flink   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
	Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
			(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
	mapper = InstantiationUtil.instantiate(mapperClass);

	jobConf = new JobConf();
	jobConf.readFields(in);
}
 
源代码15 项目: hadoop   文件: DelegatingMapper.java
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
    Reporter reporter) throws IOException {

  if (mapper == null) {
    // Find the Mapper from the TaggedInputSplit.
    TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
       .getMapperClass(), conf);
  }
  mapper.map(key, value, outputCollector, reporter);
}
 
源代码16 项目: hadoop   文件: TaggedInputSplit.java
/**
 * Creates a new TaggedInputSplit.
 * 
 * @param inputSplit The InputSplit to be tagged
 * @param conf The configuration to use
 * @param inputFormatClass The InputFormat class to use for this job
 * @param mapperClass The Mapper class to use for this job
 */
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
    Class<? extends InputFormat> inputFormatClass,
    Class<? extends Mapper> mapperClass) {
  this.inputSplitClass = inputSplit.getClass();
  this.inputSplit = inputSplit;
  this.conf = conf;
  this.inputFormatClass = inputFormatClass;
  this.mapperClass = mapperClass;
}
 
源代码17 项目: hadoop   文件: TaggedInputSplit.java
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
  inputSplitClass = (Class<? extends InputSplit>) readClass(in);
  inputSplit = (InputSplit) ReflectionUtils
     .newInstance(inputSplitClass, conf);
  inputSplit.readFields(in);
  inputFormatClass = (Class<? extends InputFormat>) readClass(in);
  mapperClass = (Class<? extends Mapper>) readClass(in);
}
 
源代码18 项目: hadoop   文件: ChainMapper.java
/**
 * Chains the <code>map(...)</code> methods of the Mappers in the chain.
 */
@SuppressWarnings({"unchecked"})
public void map(Object key, Object value, OutputCollector output,
                Reporter reporter) throws IOException {
  Mapper mapper = chain.getFirstMap();
  if (mapper != null) {
    mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
               reporter);
  }
}
 
源代码19 项目: hadoop   文件: MultipleInputs.java
/**
 * Add a {@link Path} with a custom {@link InputFormat} and
 * {@link Mapper} to the list of inputs for the map-reduce job.
 * 
 * @param conf The configuration of the job
 * @param path {@link Path} to be added to the list of inputs for the job
 * @param inputFormatClass {@link InputFormat} class to use for this path
 * @param mapperClass {@link Mapper} class to use for this path
 */
public static void addInputPath(JobConf conf, Path path,
    Class<? extends InputFormat> inputFormatClass,
    Class<? extends Mapper> mapperClass) {

  addInputPath(conf, path, inputFormatClass);

  String mapperMapping = path.toString() + ";" + mapperClass.getName();
  String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
  conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
     : mappers + "," + mapperMapping);

  conf.setMapperClass(DelegatingMapper.class);
}
 
源代码20 项目: big-c   文件: DelegatingMapper.java
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
    Reporter reporter) throws IOException {

  if (mapper == null) {
    // Find the Mapper from the TaggedInputSplit.
    TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
       .getMapperClass(), conf);
  }
  mapper.map(key, value, outputCollector, reporter);
}
 
源代码21 项目: big-c   文件: TaggedInputSplit.java
/**
 * Creates a new TaggedInputSplit.
 * 
 * @param inputSplit The InputSplit to be tagged
 * @param conf The configuration to use
 * @param inputFormatClass The InputFormat class to use for this job
 * @param mapperClass The Mapper class to use for this job
 */
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
    Class<? extends InputFormat> inputFormatClass,
    Class<? extends Mapper> mapperClass) {
  this.inputSplitClass = inputSplit.getClass();
  this.inputSplit = inputSplit;
  this.conf = conf;
  this.inputFormatClass = inputFormatClass;
  this.mapperClass = mapperClass;
}
 
源代码22 项目: big-c   文件: TaggedInputSplit.java
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
  inputSplitClass = (Class<? extends InputSplit>) readClass(in);
  inputSplit = (InputSplit) ReflectionUtils
     .newInstance(inputSplitClass, conf);
  inputSplit.readFields(in);
  inputFormatClass = (Class<? extends InputFormat>) readClass(in);
  mapperClass = (Class<? extends Mapper>) readClass(in);
}
 
源代码23 项目: big-c   文件: ChainMapper.java
/**
 * Chains the <code>map(...)</code> methods of the Mappers in the chain.
 */
@SuppressWarnings({"unchecked"})
public void map(Object key, Object value, OutputCollector output,
                Reporter reporter) throws IOException {
  Mapper mapper = chain.getFirstMap();
  if (mapper != null) {
    mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
               reporter);
  }
}
 
源代码24 项目: big-c   文件: MultipleInputs.java
/**
 * Add a {@link Path} with a custom {@link InputFormat} and
 * {@link Mapper} to the list of inputs for the map-reduce job.
 * 
 * @param conf The configuration of the job
 * @param path {@link Path} to be added to the list of inputs for the job
 * @param inputFormatClass {@link InputFormat} class to use for this path
 * @param mapperClass {@link Mapper} class to use for this path
 */
public static void addInputPath(JobConf conf, Path path,
    Class<? extends InputFormat> inputFormatClass,
    Class<? extends Mapper> mapperClass) {

  addInputPath(conf, path, inputFormatClass);

  String mapperMapping = path.toString() + ";" + mapperClass.getName();
  String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
  conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
     : mappers + "," + mapperMapping);

  conf.setMapperClass(DelegatingMapper.class);
}
 
源代码25 项目: flink   文件: HadoopMapFunction.java
/**
 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
 * The Hadoop Mapper is configured with the provided JobConf.
 *
 * @param hadoopMapper The Hadoop Mapper to wrap.
 * @param conf The JobConf that is used to configure the Hadoop Mapper.
 */
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
	if (hadoopMapper == null) {
		throw new NullPointerException("Mapper may not be null.");
	}
	if (conf == null) {
		throw new NullPointerException("JobConf may not be null.");
	}

	this.mapper = hadoopMapper;
	this.jobConf = conf;
}
 
源代码26 项目: flink   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
	Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
	Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);

	final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
	final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
	return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
 
源代码27 项目: flink   文件: HadoopMapFunction.java
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
	Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
			(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
	mapper = InstantiationUtil.instantiate(mapperClass);

	jobConf = new JobConf();
	jobConf.readFields(in);
}
 
源代码28 项目: hiped2   文件: PipelineTest.java
@Before
public void setUp() {
  mapper1 = new IdentityMapper<Text, Text>();
  reducer1 = new IdentityReducer<Text, Text>();
  mapper2 = new IdentityMapper<Text, Text>();
  reducer2 = new IdentityReducer<Text, Text>();
  driver = new PipelineMapReduceDriver<Text, Text, Text, Text>();
  driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1));
  driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2));
}
 
源代码29 项目: RDFS   文件: DelegatingMapper.java
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
    Reporter reporter) throws IOException {

  if (mapper == null) {
    // Find the Mapper from the TaggedInputSplit.
    TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
       .getMapperClass(), conf);
  }
  mapper.map(key, value, outputCollector, reporter);
}
 
源代码30 项目: RDFS   文件: TaggedInputSplit.java
/**
 * Creates a new TaggedInputSplit.
 * 
 * @param inputSplit The InputSplit to be tagged
 * @param conf The configuration to use
 * @param inputFormatClass The InputFormat class to use for this job
 * @param mapperClass The Mapper class to use for this job
 */
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
    Class<? extends InputFormat> inputFormatClass,
    Class<? extends Mapper> mapperClass) {
  this.inputSplitClass = inputSplit.getClass();
  this.inputSplit = inputSplit;
  this.conf = conf;
  this.inputFormatClass = inputFormatClass;
  this.mapperClass = mapperClass;
}
 
 类所在包
 类方法
 同包方法