org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getOutputPath ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: terrapin   文件: HFileOutputFormat.java
public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException {
  // Get the path of the temporary output file
  final Path outputPath = FileOutputFormat.getOutputPath(context);
  final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
  final Configuration conf = context.getConfiguration();
  final FileSystem fs = outputDir.getFileSystem(conf);

  int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
  // Default to snappy.
  Compression.Algorithm compressionAlgorithm = getAlgorithm(
      conf.get(Constants.HFILE_COMPRESSION));
  final StoreFile.Writer writer =
      new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize)
          .withFilePath(hfilePath(outputPath, context.getTaskAttemptID().getTaskID().getId()))
          .withCompression(compressionAlgorithm)
          .build();
  return new HFileRecordWriter(writer);
}
 
源代码2 项目: big-c   文件: TestJoinDatamerge.java
private static void checkOuterConsistency(Job job, Path[] src) 
    throws IOException {
  Path outf = FileOutputFormat.getOutputPath(job);
  FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new 
                           Utils.OutputFileUtils.OutputFilesFilter());
  assertEquals("number of part files is more than 1. It is" + outlist.length,
    1, outlist.length);
  assertTrue("output file with zero length" + outlist[0].getLen(),
    0 < outlist[0].getLen());
  SequenceFile.Reader r =
    new SequenceFile.Reader(cluster.getFileSystem(),
        outlist[0].getPath(), job.getConfiguration());
  IntWritable k = new IntWritable();
  IntWritable v = new IntWritable();
  while (r.next(k, v)) {
    assertEquals("counts does not match", v.get(),
      countProduct(k, src, job.getConfiguration()));
  }
  r.close();
}
 
源代码3 项目: incubator-pinot   文件: PinotRecordWriter.java
public PinotRecordWriter(TaskAttemptContext job, SegmentGeneratorConfig segmentGeneratorConfig,
    FieldExtractor<T> fieldExtractor)
    throws IOException {
  _segmentGeneratorConfig = segmentGeneratorConfig;
  _fieldExtractor = fieldExtractor;

  _tempSegmentDir = new File(PinotOutputFormat.getTempSegmentDir(job));
  if (_tempSegmentDir.exists()) {
    FileUtils.cleanDirectory(_tempSegmentDir);
  }
  _dataFileDir = new File(_tempSegmentDir, "dataFile");
  FileUtils.forceMkdir(_dataFileDir);
  _segmentTarDir = new File(_tempSegmentDir, "segmentTar");
  FileUtils.forceMkdir(_segmentTarDir);

  _handler = new FileHandler(_dataFileDir.getPath(), "data", ".json", MAX_FILE_SIZE);
  _handler.open(true);

  _fileSystem = FileSystem.get(job.getConfiguration());
  _outputDir = FileOutputFormat.getOutputPath(job);
}
 
源代码4 项目: hadoop   文件: LargeSorter.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(
        new Path(outDir, "dummy-split-" + i), 0, 1, null));
  }
  return result;
}
 
源代码5 项目: laser   文件: AdmmIterationOutputFormat.java
public Path getDefaultWorkFile(TaskAttemptContext context, String extension)
		throws IOException {
	FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
	String outputName = context.getConfiguration().get(
			"com.b5m.admm.iteration.output.name");
	if (null == outputName) {
		return new Path(committer.getWorkPath(), "Z");
	}
	return new Path(FileOutputFormat.getOutputPath(context), outputName);
}
 
源代码6 项目: hadoop   文件: RandomTextWriterJob.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                              (String[])null));
  }
  return result;
}
 
源代码7 项目: tez   文件: RandomWriter.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                              (String[])null));
  }
  return result;
}
 
源代码8 项目: hadoop   文件: RandomWriter.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                              (String[])null));
  }
  return result;
}
 
源代码9 项目: pravega-samples   文件: RandomWriter.java
/**
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
    List<InputSplit> result = new ArrayList<InputSplit>();
    Path outDir = FileOutputFormat.getOutputPath(job);
    int numSplits =
            job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
    for (int i = 0; i < numSplits; ++i) {
        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
                (String[]) null));
    }
    return result;
}
 
源代码10 项目: big-c   文件: LargeSorter.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(
        new Path(outDir, "dummy-split-" + i), 0, 1, null));
  }
  return result;
}
 
源代码11 项目: big-c   文件: RandomWriter.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                              (String[])null));
  }
  return result;
}
 
源代码12 项目: big-c   文件: RandomTextWriterJob.java
/** 
 * Generate the requested number of file splits, with the filename
 * set to the filename of the output file.
 */
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> result = new ArrayList<InputSplit>();
  Path outDir = FileOutputFormat.getOutputPath(job);
  int numSplits = 
        job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  for(int i=0; i < numSplits; ++i) {
    result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                              (String[])null));
  }
  return result;
}
 
源代码13 项目: jumbune   文件: JobUtil.java
/**
 * This method call when injected into a class will modify the output path,
 * only if output is into HDFS
 * 
 * @param job
 *            Job whose output path need to be changed
 */
public static void modifyOutputPath(Job job) throws Exception {
	Path path = FileOutputFormat.getOutputPath(job);
	if (path == null) {
		throw new IllegalArgumentException("Job Output path is null, expecting not null path value");
	}
		StringBuilder out = new StringBuilder(path.toString());
		out.append(SEPARATOR_UNDERSCORE).append(System.currentTimeMillis());
		FileOutputFormat.setOutputPath(job, new Path(out.toString()));
}
 
源代码14 项目: BigDataPlatform   文件: TransformerOutputFormat.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
 
源代码15 项目: BigDataArchitect   文件: TransformerOutputFormat.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
 
源代码16 项目: BigDataArchitect   文件: TransformerOutputFormat.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
 
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
    throws IOException, InterruptedException {
  return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
                                 context);
}
 
源代码18 项目: hadoop   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  Job job = Job.getInstance(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  Configuration conf = job.getConfiguration();
  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormatClass(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(conf);  
    Path tmpDir = new Path("/tmp");
    Random r = new Random();
    Path indirInputFile = new Path(tmpDir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        tmpDir.getFileSystem(conf), conf, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(conf);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return ret;
}
 
源代码19 项目: gemfirexd-oss   文件: GFOutputFormat.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
    throws IOException, InterruptedException {
  return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
      context);
}
 
源代码20 项目: ignite   文件: HadoopClientProtocolSelfTest.java
/**
 * Constructor.
 *
 * @param ctx Task attempt context.
 * @param delegate Delegate.
 * @throws IOException If failed.
 */
private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
    super(FileOutputFormat.getOutputPath(ctx), ctx);

    this.delegate = delegate;
}