下面列出了怎么用org.apache.hadoop.mapred.FileOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
Class<? extends Reducer> reduceClass) throws Exception {
JobConf conf = new JobConf(PersonVersion.class);
conf.setJobName(jobName);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(mapClass);
conf.setCombinerClass(reduceClass);
conf.setReducerClass(reduceClass);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
JobClient.runJob(conf);
}
OrcWriterProxy(Configuration config, String fileName) throws IOException{
// initial columns
columns = config.getListConfiguration(Key.COLUMN);
// initial inspector
List<String> columnNames = getColumnNames(columns);
List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
inspector = (StructObjectInspector)ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
// initial writer
String compress = config.getString(Key.COMPRESS, null);
FileOutputFormat outFormat = new OrcOutputFormat();
if(!"NONE".equalsIgnoreCase(compress) && null != compress ) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
//initial orcSerde
orcSerde = new OrcSerde();
}
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable<?>, Writable>() {
public synchronized void write(WritableComparable<?> key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
JobConf job = new JobConf(config, TestDFSIO.class);
FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* Creates a simple copy job.
*
* @param indirs List of input directories.
* @param outdir Output directory.
* @return JobConf initialised for a simple copy job.
* @throws Exception If an error occurs creating job configuration.
*/
static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
Configuration defaults = new Configuration();
JobConf theJob = new JobConf(defaults, TestJobControl.class);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
theJob.setMapperClass(DataCopy.class);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
theJob.setReducerClass(DataCopy.class);
theJob.setNumMapTasks(12);
theJob.setNumReduceTasks(4);
return theJob;
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Override
public void close() throws IOException {
// Output the result to a file Results in the output dir
FileContext fc;
try {
fc = FileContext.getFileContext(jobConf);
} catch (IOException ioe) {
System.err.println("Can not initialize the file system: " +
ioe.getLocalizedMessage());
return;
}
FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"),
EnumSet.of(CreateFlag.CREATE));
PrintStream out = new PrintStream(o);
printResults(out);
out.close();
o.close();
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskAttemptID.toString()));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
}
}
return null;
}
/**
* Creates a simple copy job.
*
* @param indirs List of input directories.
* @param outdir Output directory.
* @return JobConf initialised for a simple copy job.
* @throws Exception If an error occurs creating job configuration.
*/
static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
Configuration defaults = new Configuration();
JobConf theJob = new JobConf(defaults, TestJobControl.class);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
theJob.setMapperClass(DataCopy.class);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
theJob.setReducerClass(DataCopy.class);
theJob.setNumMapTasks(12);
theJob.setNumReduceTasks(4);
return theJob;
}
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
public void configure(JobConf conf) {
this.conf = conf;
tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
masterIndex = new Path(tmpOutputDir, "_masterindex");
index = new Path(tmpOutputDir, "_index");
try {
fs = masterIndex.getFileSystem(conf);
if (fs.exists(masterIndex)) {
fs.delete(masterIndex, false);
}
if (fs.exists(index)) {
fs.delete(index, false);
}
indexStream = fs.create(index);
outStream = fs.create(masterIndex);
String version = VERSION + " \n";
outStream.write(version.getBytes(Charsets.UTF_8));
} catch(IOException e) {
throw new RuntimeException(e);
}
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public int run(String[] args) throws Exception {
JobConf job = (JobConf) getConf();
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraValidate");
job.setJarByClass(TeraValidate.class);
job.setMapperClass(ValidateMapper.class);
job.setReducerClass(ValidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// force a single reducer
job.setNumReduceTasks(1);
// force a single split
job.setLong("mapred.min.split.size", Long.MAX_VALUE);
job.setInputFormat(TeraInputFormat.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(InvertedIndex.class);
conf.setJobName("InvertedIndex");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(conf, new Path("input"));
FileOutputFormat.setOutputPath(conf, new Path("output"));
conf.setMapperClass(InvertedIndexMapper.class);
conf.setReducerClass(InvertedIndexReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public int run(String[] args) throws Exception {
LOG.info("starting");
JobConf job = (JobConf) getConf();
Path inputDir = new Path(args[0]);
inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(TeraInputFormat.class);
job.setOutputFormat(TeraOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
TeraInputFormat.writePartitionFile(job, partitionFile);
DistributedCache.addCacheFile(partitionUri, job);
DistributedCache.createSymlink(job);
job.setInt("dfs.replication", 1);
TeraOutputFormat.setFinalSync(job, true);
JobClient.runJob(job);
LOG.info("done");
return 0;
}
public void run(String[] args) throws Exception
{
JobConf conf = new JobConf(this.getClass());
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
@Override
protected void runJob(String jobName, Configuration c, List<Scan> scans)
throws IOException, InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(TEST_UTIL.getConfiguration());
job.setJobName(jobName);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
TableMapReduceUtil.addDependencyJars(job);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(1); // one to get final "first" and "last" key
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
LOG.info("Started " + job.getJobName());
RunningJob runningJob = JobClient.runJob(job);
runningJob.waitForCompletion();
assertTrue(runningJob.isSuccessful());
LOG.info("After map/reduce completion - job " + jobName);
}
static <K> void configureDataFileWriter(DataFileWriter<K> writer,
JobConf job) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(job)) {
int level = job.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory = codecName.equals(DEFLATE_CODEC) ?
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
writer.setSyncInterval(job.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
DEFAULT_SYNC_INTERVAL));
// copy metadata from job
for (Map.Entry<String,String> e : job) {
if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),e.getValue());
if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
URLDecoder.decode(e.getValue(), "ISO-8859-1")
.getBytes("ISO-8859-1"));
}
}
private void markSuccessfulOutputDir(JobConf conf)
throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
FileSystem fileSys = outputPath.getFileSystem(conf);
// create a file in the folder to mark it
if (fileSys.exists(outputPath)) {
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
fileSys.create(filePath).close();
}
}
}
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if ((out == null) && (job.getNumReduceTasks() != 0)) {
throw new InvalidJobConfException(
"Output directory not set in JobConf.");
}
if (fs == null) {
fs = out.getFileSystem(job);
}
if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
throw new IOException("Segment already fetched!");
}
public int run(String[] args) throws Exception {
if(args.length < 2) {
printUsage();
return 1;
}
JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
job.setJobName("MultiFileWordCount");
//set the InputFormat of the job to our InputFormat
job.setInputFormat(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
return 0;
}