下面列出了怎么用org.apache.hadoop.mapred.Mapper的API类实例代码及写法,或者点击链接到github查看源代码。
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
Class<? extends Reducer> reduceClass) throws Exception {
JobConf conf = new JobConf(PersonVersion.class);
conf.setJobName(jobName);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(mapClass);
conf.setCombinerClass(reduceClass);
conf.setReducerClass(reduceClass);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
JobClient.runJob(conf);
}
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
JobConf job = new JobConf(config, TestDFSIO.class);
FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
JobConf job = new JobConf(config, TestDFSIO.class);
FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void configure(JobConf job) {
super.configure(job);
Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class);
if(c != null) {
postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
LOG.info("PostHook="+c.getName());
}
c = job.getClass("stream.reduce.prehook", null, Reducer.class);
if(c != null) {
preReducer = (Reducer)ReflectionUtils.newInstance(c, job);
oc = new InmemBufferingOutputCollector();
LOG.info("PreHook="+c.getName());
}
this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false);
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
* The Hadoop Mapper is configured with the provided JobConf.
*
* @param hadoopMapper The Hadoop Mapper to wrap.
* @param conf The JobConf that is used to configure the Hadoop Mapper.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
if (hadoopMapper == null) {
throw new NullPointerException("Mapper may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.mapper = hadoopMapper;
this.jobConf = conf;
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
* The Hadoop Mapper is configured with the provided JobConf.
*
* @param hadoopMapper The Hadoop Mapper to wrap.
* @param conf The JobConf that is used to configure the Hadoop Mapper.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
if (hadoopMapper == null) {
throw new NullPointerException("Mapper may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.mapper = hadoopMapper;
this.jobConf = conf;
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
/**
* Creates a new TaggedInputSplit.
*
* @param inputSplit The InputSplit to be tagged
* @param conf The configuration to use
* @param inputFormatClass The InputFormat class to use for this job
* @param mapperClass The Mapper class to use for this job
*/
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
this.inputSplitClass = inputSplit.getClass();
this.inputSplit = inputSplit;
this.conf = conf;
this.inputFormatClass = inputFormatClass;
this.mapperClass = mapperClass;
}
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
inputSplitClass = (Class<? extends InputSplit>) readClass(in);
inputSplit = (InputSplit) ReflectionUtils
.newInstance(inputSplitClass, conf);
inputSplit.readFields(in);
inputFormatClass = (Class<? extends InputFormat>) readClass(in);
mapperClass = (Class<? extends Mapper>) readClass(in);
}
/**
* Chains the <code>map(...)</code> methods of the Mappers in the chain.
*/
@SuppressWarnings({"unchecked"})
public void map(Object key, Object value, OutputCollector output,
Reporter reporter) throws IOException {
Mapper mapper = chain.getFirstMap();
if (mapper != null) {
mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
reporter);
}
}
/**
* Add a {@link Path} with a custom {@link InputFormat} and
* {@link Mapper} to the list of inputs for the map-reduce job.
*
* @param conf The configuration of the job
* @param path {@link Path} to be added to the list of inputs for the job
* @param inputFormatClass {@link InputFormat} class to use for this path
* @param mapperClass {@link Mapper} class to use for this path
*/
public static void addInputPath(JobConf conf, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
addInputPath(conf, path, inputFormatClass);
String mapperMapping = path.toString() + ";" + mapperClass.getName();
String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
: mappers + "," + mapperMapping);
conf.setMapperClass(DelegatingMapper.class);
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
/**
* Creates a new TaggedInputSplit.
*
* @param inputSplit The InputSplit to be tagged
* @param conf The configuration to use
* @param inputFormatClass The InputFormat class to use for this job
* @param mapperClass The Mapper class to use for this job
*/
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
this.inputSplitClass = inputSplit.getClass();
this.inputSplit = inputSplit;
this.conf = conf;
this.inputFormatClass = inputFormatClass;
this.mapperClass = mapperClass;
}
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
inputSplitClass = (Class<? extends InputSplit>) readClass(in);
inputSplit = (InputSplit) ReflectionUtils
.newInstance(inputSplitClass, conf);
inputSplit.readFields(in);
inputFormatClass = (Class<? extends InputFormat>) readClass(in);
mapperClass = (Class<? extends Mapper>) readClass(in);
}
/**
* Chains the <code>map(...)</code> methods of the Mappers in the chain.
*/
@SuppressWarnings({"unchecked"})
public void map(Object key, Object value, OutputCollector output,
Reporter reporter) throws IOException {
Mapper mapper = chain.getFirstMap();
if (mapper != null) {
mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
reporter);
}
}
/**
* Add a {@link Path} with a custom {@link InputFormat} and
* {@link Mapper} to the list of inputs for the map-reduce job.
*
* @param conf The configuration of the job
* @param path {@link Path} to be added to the list of inputs for the job
* @param inputFormatClass {@link InputFormat} class to use for this path
* @param mapperClass {@link Mapper} class to use for this path
*/
public static void addInputPath(JobConf conf, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
addInputPath(conf, path, inputFormatClass);
String mapperMapping = path.toString() + ";" + mapperClass.getName();
String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
: mappers + "," + mapperMapping);
conf.setMapperClass(DelegatingMapper.class);
}
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
* The Hadoop Mapper is configured with the provided JobConf.
*
* @param hadoopMapper The Hadoop Mapper to wrap.
* @param conf The JobConf that is used to configure the Hadoop Mapper.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
if (hadoopMapper == null) {
throw new NullPointerException("Mapper may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.mapper = hadoopMapper;
this.jobConf = conf;
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> mapperClass =
(Class<Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>) in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
@Before
public void setUp() {
mapper1 = new IdentityMapper<Text, Text>();
reducer1 = new IdentityReducer<Text, Text>();
mapper2 = new IdentityMapper<Text, Text>();
reducer2 = new IdentityReducer<Text, Text>();
driver = new PipelineMapReduceDriver<Text, Text, Text, Text>();
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1));
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2));
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
/**
* Creates a new TaggedInputSplit.
*
* @param inputSplit The InputSplit to be tagged
* @param conf The configuration to use
* @param inputFormatClass The InputFormat class to use for this job
* @param mapperClass The Mapper class to use for this job
*/
public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
this.inputSplitClass = inputSplit.getClass();
this.inputSplit = inputSplit;
this.conf = conf;
this.inputFormatClass = inputFormatClass;
this.mapperClass = mapperClass;
}