下面列出了怎么用org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public Job createSubmittableJob(String[] args) throws IOException {
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
generatePartitions(partitionsPath);
Job job = Job.getInstance(getConf(),
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
job.setJarByClass(HashTable.class);
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
// use a TotalOrderPartitioner and reducers to group region output into hash files
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
job.setReducerClass(Reducer.class); // identity reducer
job.setNumReduceTasks(tableHash.numHashFiles);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(ImmutableBytesWritable.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
return job;
}
private void testMapFileOutputCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "MultInputOutput"; // 作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
job.setJar("export\\MultiInOutput.jar"); // 指定本地jar包
job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); // 设置Mapper输出Value类型
job.setReducerClass(MultOutputReducer.class); // 指定Reducer类
// job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
// job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// 3.指定作业多输入路径,及Map所使用的类
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/txt"), TextInputFormat.class, TxtFileMapper.class);
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/csv"), TextInputFormat.class, CsvFileMapper.class);
// 定义多文件输出的文件名、输出格式、Reduce输出键类型,值类型
MultipleOutputs.addNamedOutput(job, "f2015", TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2016", SequenceFileOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2017", MapFileOutputFormat.class, Text.class, IntWritable.class);
// 设置作业输出路径
String outputDir = "/expr/multiinoutput/output"; // 实验输出目录
Path outPath = new Path(hdfs + outputDir);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MultOutput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultOutput.class); //指定运行时作业类
job.setJar("export\\MultOutput.jar"); //指定本地jar包
job.setMapperClass(MultOutputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MultOutputReducer.class); //指定Reducer类
//job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
//job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//定义多文件输出的文件名、输出格式、键类型、值类型
MultipleOutputs.addNamedOutput(job, "f2015", TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2016", SequenceFileOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2017", MapFileOutputFormat.class, Text.class, IntWritable.class);
//3.设置作业输入和输出路径
String dataDir = "/expr/multoutput/data"; //实验数据目录
String outputDir = "/expr/multoutput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
/**
* Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link RecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileRecordReader}
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output map files
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFile(String path, JavaRDD<List<Writable>> rdd, Configuration c,
Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithIndex(); //Note: Long values are unique + contiguous, but requires a count
JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class, MapFileOutputFormat.class,
c);
}
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link SequenceRecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
MapFileOutputFormat.class, c);
}
/**
* Save a {@code JavaRDD<List<Writable>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link RecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileRecordReader}
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @param maxOutputFiles Nullable. If non-null: first coalesce the RDD to the specified size (number of partitions)
* to limit the maximum number of output map files
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFile(String path, JavaRDD<List<Writable>> rdd, Configuration c,
Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<Writable>, Long> dataIndexPairs = rdd.zipWithIndex(); //Note: Long values are unique + contiguous, but requires a count
JavaPairRDD<LongWritable, RecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new RecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, RecordWritable.class, MapFileOutputFormat.class,
c);
}
/**
* Save a {@code JavaRDD<List<List<Writable>>>} to a Hadoop {@link org.apache.hadoop.io.MapFile}. Each record is
* given a <i>unique and contiguous</i> {@link LongWritable} key, and values are stored as
* {@link SequenceRecordWritable} instances.<br>
* <b>Note</b>: If contiguous keys are not required, using a sequence file instead is preferable from a performance
* point of view. Contiguous keys are often only required for non-Spark use cases, such as with
* {@link org.datavec.hadoop.records.reader.mapfile.MapFileSequenceRecordReader}<br>
* <p>
* Use {@link #restoreMapFileSequences(String, JavaSparkContext)} to restore values saved with this method.
*
* @param path Path to save the MapFile
* @param rdd RDD to save
* @param c Configuration object, used to customise options for the map file
* @see #saveMapFileSequences(String, JavaRDD)
* @see #saveSequenceFile(String, JavaRDD)
*/
public static void saveMapFileSequences(String path, JavaRDD<List<List<Writable>>> rdd, Configuration c,
Integer maxOutputFiles) {
path = FilenameUtils.normalize(path, true);
if (maxOutputFiles != null) {
rdd = rdd.coalesce(maxOutputFiles);
}
JavaPairRDD<List<List<Writable>>, Long> dataIndexPairs = rdd.zipWithIndex();
JavaPairRDD<LongWritable, SequenceRecordWritable> keyedByIndex =
dataIndexPairs.mapToPair(new SequenceRecordSavePrepPairFunction());
keyedByIndex.saveAsNewAPIHadoopFile(path, LongWritable.class, SequenceRecordWritable.class,
MapFileOutputFormat.class, c);
}