下面列出了org.apache.hadoop.mapred.FileOutputFormat#getOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public RecordWriter<WritableComparable, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable, Writable>() {
public synchronized void write(WritableComparable key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable<?>, Writable>() {
public synchronized void write(WritableComparable<?> key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskAttemptID.toString()));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
}
}
return null;
}
public void localizeConfiguration(JobConf jobConf)
throws IOException, InterruptedException {
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap);
Path outputPath = FileOutputFormat.getOutputPath(jobConf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(jobConf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(jobConf, outputPath);
}
}
}
@Override
public RecordWriter<K, Text> getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress) throws IOException {
String extension = "";
Path file = FileOutputFormat.getTaskOutputPath(job, MANIFEST_FILENAME);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
if (getCompressOutput(job)) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
extension = codec.getDefaultExtension();
}
return new ExportManifestRecordWriter<>(fileOut, FileOutputFormat.getOutputPath(job),
extension);
}
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if ((out == null) && (job.getNumReduceTasks() != 0)) {
throw new InvalidJobConfException(
"Output directory not set in JobConf.");
}
if (fs == null) {
fs = out.getFileSystem(job);
}
if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
throw new IOException("Segment already fetched!");
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
this.committer = newOutputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if ((out == null) && (job.getNumReduceTasks() != 0)) {
throw new InvalidJobConfException(
"Output directory not set in JobConf.");
}
if (fs == null) {
fs = out.getFileSystem(job);
}
if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
throw new IOException("Segment already fetched!");
}
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
InputSplit[] result = new InputSplit[numSplits];
Path outDir = FileOutputFormat.getOutputPath(job);
for(int i=0; i < result.length; ++i) {
result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
(String[])null);
}
return result;
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
this.committer = newOutputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
/**
* Generate the requested number of file splits, with the filename set
* to the filename of the output file.
*/
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
InputSplit[] result = new InputSplit[numSplits];
Path outDir = FileOutputFormat.getOutputPath(job);
for (int i = 0; i < result.length; ++i) {
result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
(String[]) null);
}
return result;
}
public void setupJob(JobConf conf) throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
}
}
}
private void markSuccessfulOutputDir(JobConf conf)
throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
FileSystem fileSys = outputPath.getFileSystem(conf);
// create a file in the folder to mark it
if (fileSys.exists(outputPath)) {
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
fileSys.create(filePath).close();
}
}
}
public void cleanupJob(JobConf conf) throws IOException {
// do the clean up of temporary directory
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
}
} else {
LOG.warn("Output path is null in cleanup");
}
}
private void writeFile(JobConf conf , String filename) throws IOException {
System.out.println("writing file ----" + filename);
Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem fs = outputPath.getFileSystem(conf);
fs.create(new Path(outputPath, filename)).close();
}
private void writeFile(JobConf conf , String filename) throws IOException {
System.out.println("writing file ----" + filename);
Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem fs = outputPath.getFileSystem(conf);
fs.create(new Path(outputPath, filename)).close();
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content)
contentOut.append(key, w);
else if (w instanceof Parse)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, JobConf job, String name,
Progressable progress) throws IOException {
Path dir = FileOutputFormat.getOutputPath(job);
DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
return new LineRecordWriter(fileOut);
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job,
final String name,
final Progressable progress) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch =
new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
final Path content =
new Path(new Path(out, Content.DIR_NAME), name);
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
final MapFile.Writer fetchOut =
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
compType, progress);
return new RecordWriter<Text, NutchWritable>() {
private MapFile.Writer contentOut;
private RecordWriter<Text, Parse> parseOut;
{
if (Fetcher.isStoringContent(job)) {
contentOut = new MapFile.Writer(job, fs, content.toString(),
Text.class, Content.class,
compType, progress);
}
if (Fetcher.isParsing(job)) {
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
}
}
public void write(Text key, NutchWritable value)
throws IOException {
Writable w = value.get();
if (w instanceof CrawlDatum)
fetchOut.append(key, w);
else if (w instanceof Content && contentOut != null)
contentOut.append(key, w);
else if (w instanceof Parse && parseOut != null)
parseOut.write(key, (Parse)w);
}
public void close(Reporter reporter) throws IOException {
fetchOut.close();
if (contentOut != null) {
contentOut.close();
}
if (parseOut != null) {
parseOut.close(reporter);
}
}
};
}
public RecordWriter<Text,CrawlDatum> getRecordWriter(FileSystem fs, JobConf job, String name,
Progressable progress) throws IOException {
Path dir = FileOutputFormat.getOutputPath(job);
DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
return new LineRecordWriter(fileOut);
}