下面列出了怎么用com.mongodb.hadoop.MongoInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private void setupJob() {
_job.setInputFormatClass(MongoInputFormat.class);
_job.setMapperClass(BulkImportMapper.class);
_job.setMapOutputKeyClass(ImmutableBytesWritable.class);
_job.setMapOutputValueClass(Put.class);
MongoConfigUtil.setInputURI(getConfiguration(), _mongoURI);
MongoConfigUtil.setReadSplitsFromSecondary(getConfiguration(), true);
}
/**
* Instantiates a new Mongo extractor.
*/
public MongoExtractor() {
super();
this.inputFormat = new MongoInputFormat();
this.outputFormat = new MongoOutputFormat();
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
if(args.length < 3) {
System.err.println("Usage: MapReduceExercise " +
"[mongodb input uri] " +
"[mongodb output uri] " +
"update=[true or false]");
System.err.println("Example: MapReduceExercise " +
"mongodb://127.0.0.1:27017/movielens.ratings " +
"mongodb://127.0.0.1:27017/movielens.ratings.stats update=false");
System.err.println("Example: MapReduceExercise " +
"mongodb://127.0.0.1:27017/movielens.ratings " +
"mongodb://127.0.0.1:27017/movielens.movies update=true");
System.exit(-1);
}
Class outputValueClass = BSONWritable.class;
Class reducerClass = Reduce.class;
if(args[2].equals("update=true")) {
outputValueClass = MongoUpdateWritable.class;
reducerClass = ReduceUpdater.class;
}
Configuration conf = new Configuration();
// Set MongoDB-specific configuration items
conf.setClass("mongo.job.mapper", Map.class, Mapper.class);
conf.setClass("mongo.job.reducer", reducerClass, Reducer.class);
conf.setClass("mongo.job.mapper.output.key", IntWritable.class, Object.class);
conf.setClass("mongo.job.mapper.output.value", DoubleWritable.class, Object.class);
conf.setClass("mongo.job.output.key", NullWritable.class, Object.class);
conf.setClass("mongo.job.output.value", outputValueClass, Object.class);
conf.set("mongo.input.uri", args[0]);
conf.set("mongo.output.uri", args[1]);
Job job = Job.getInstance(conf);
// Set Hadoop-specific job parameters
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(outputValueClass);
job.setMapperClass(Map.class);
job.setReducerClass(reducerClass);
job.setJarByClass(MapReduceExercise.class);
job.submit();
}