类org.apache.hadoop.mapreduce.lib.input.NLineInputFormat源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.NLineInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: ViraPipe   文件: InterleaveMulti.java
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  //TODO: Handle also compressed files
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);

  JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

  splitRDD.foreach( split ->  {

    FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
    writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
 
源代码2 项目: ViraPipe   文件: InterleaveMulti.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
 
源代码3 项目: ViraPipe   文件: Decompress.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);

      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
 
源代码4 项目: ViraPipe   文件: DecompressInterleave.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    String[] ns = fst.getPath().getName().split("\\.");
    //TODO: Handle also compressed files
    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
 
源代码5 项目: ViraPipe   文件: Interleave.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
    List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
    JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
    JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

    zips.foreach( splits ->  {
      Path path = splits._1.getPath();
      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
      FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
      writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
    });
  }
 
源代码6 项目: mrgeo   文件: DelimitedVectorInputFormat.java
public static void setupJob(Job job, int minFeaturesPerSplit, long featureCount)
{
  if (minFeaturesPerSplit > 0)
  {
    if (featureCount < 0)
    {
      throw new IllegalArgumentException("Expected a feature count");
    }
    int maxMapTasks = job.getConfiguration().getInt("mapred.tasktracker.map.tasks.maximum", -1);
    if (maxMapTasks > 0)
    {
      int featuresPerSplit = (int) (featureCount / maxMapTasks);
      if (featuresPerSplit < minFeaturesPerSplit)
      {
        featuresPerSplit = minFeaturesPerSplit;
      }
      job.getConfiguration().setBoolean(USE_NLINE_FORMAT, true);
      NLineInputFormat.setNumLinesPerSplit(job, featuresPerSplit);
    }
  }
}
 
源代码7 项目: ViraPipe   文件: Decompress.java
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);

  JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

  splitRDD.foreach( split ->  {

    FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
    writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
 
源代码8 项目: ViraPipe   文件: Interleave.java
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
  Path fqpath = new Path(fqPath);
  String fqname = fqpath.getName();
  String[] ns = fqname.split("\\.");
  List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);

  JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

  splitRDD.foreach( split ->  {

    FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
    writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);

   });
}
 
@Override
public int run(String[] args)
        throws Exception
{
    Job job = Job.getInstance(getConf());

    job.setJarByClass(Phase3Step4LocalDeDuplication.class);
    job.setJobName(Phase3Step4LocalDeDuplication.class.getName());

    // paths
    String inputPath = args[0];
    // text files of ids to be deleted
    String outputPath = args[1];

    // input: reading max N lines for each mapper
    job.setInputFormatClass(NLineInputFormat.class);
    NLineInputFormat.addInputPath(job, new Path(inputPath));
    job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", LINES);

    // mapper
    job.setMapperClass(LocalGreedyDeDuplicationMapper.class);

    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

    // reducer
    job.setReducerClass(IDCollectorReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码10 项目: MapReduce-Demo   文件: NLineInput.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	conf.setInt("mapreduce.input.lineinputformat.linespermap", 1000);	//设置每个Map处理的行数
	
	//2.设置MapReduce作业配置信息
	String jobName = "NLineInput";						//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(NLineInput.class);				//指定运行时作业类
	job.setJar("export\\NLineInput.jar");				//指定本地jar包
	job.setMapperClass(NLineInputMapper.class);			//指定Mapper类
	job.setMapOutputKeyClass(Text.class);				//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(NLineInputReducer.class);		//指定Reducer类
	job.setOutputKeyClass(Text.class);					//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 		//设置Reduce输出Value类型
	
	job.setInputFormatClass(NLineInputFormat.class);	//设置输入格式化类
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/nlineinput/data";			//实验数据目录	
	String outputDir = "/expr/nlineinput/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码11 项目: ViraPipe   文件: HDFSWriter.java
private static JavaPairRDD<Text, SequencedFragment> interleaveReads(String fastq, String fastq2, int splitlen, JavaSparkContext sc) throws IOException {

        FileSystem fs = FileSystem.get(new Configuration());

        FileStatus fst = fs.getFileStatus(new Path(fastq));
        FileStatus fst2 = fs.getFileStatus(new Path(fastq2));

        List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
        List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);

        JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
        JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
        JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);

        return zips.flatMapToPair( splits ->  {

            FastqInputFormat.FastqRecordReader fqreader = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._1);
            FastqInputFormat.FastqRecordReader fqreader2 = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._2);

            ArrayList<Tuple2<Text, SequencedFragment>> reads = new ArrayList<Tuple2<Text, SequencedFragment>>();
            while (fqreader.nextKeyValue()) {
                String key = fqreader.getCurrentKey().toString();
                String[] keysplit = key.split(" ");
                key = keysplit[0];

                SequencedFragment sf = new SequencedFragment();
                sf.setQuality(new Text(fqreader.getCurrentValue().getQuality().toString()));
                sf.setSequence(new Text(fqreader.getCurrentValue().getSequence().toString()));

                if (fqreader2.nextKeyValue()) {

                    String key2 = fqreader2.getCurrentKey().toString();
                    String[] keysplit2 = key2.split(" ");
                    key2 = keysplit2[0];
                    //key2 = key2.replace(" 2:N:0:1","/2");

                    SequencedFragment sf2 = new SequencedFragment();
                    sf2.setQuality(new Text(fqreader2.getCurrentValue().getQuality().toString()));
                    sf2.setSequence(new Text(fqreader2.getCurrentValue().getSequence().toString()));
                    reads.add(new Tuple2<Text, SequencedFragment>(new Text(key), sf));
                    reads.add(new Tuple2<Text, SequencedFragment>(new Text(key2), sf2));
                }
            }

            return reads.iterator();

        });
    }
 
源代码12 项目: rdf2x   文件: ElephasQuadParser.java
@Override
public JavaRDD<Quad> parseQuads(String path) {

    Configuration conf = new Configuration();

    Integer batchSize = config.getBatchSize();
    conf.set(NLineInputFormat.LINES_PER_MAP, batchSize.toString());

    if (config.getErrorHandling() == ParseErrorHandling.Throw) {
        conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "false");
    } else {
        conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "true");
    }

    Boolean isLineBased = config.getLineBasedFormat();
    if (isLineBased == null) {
        isLineBased = guessIsLineBasedFormat(path);
    }
    JavaRDD<Quad> quads;
    Integer partitions = config.getRepartition();
    if (isLineBased) {
        log.info("Parsing RDF in parallel with batch size: {}", batchSize);
        quads = sc.newAPIHadoopFile(path,
                NQuadsInputFormat.class,
                LongWritable.class, // position
                QuadWritable.class, // value
                conf).values().map(QuadWritable::get);
    } else {
        // let Jena guess the format, load whole files
        log.info("Input format is not line based, parsing RDF by Master node only.");
        quads = sc.newAPIHadoopFile(path,
                TriplesOrQuadsInputFormat.class,
                LongWritable.class, // position
                QuadWritable.class, // value
                conf).values().map(QuadWritable::get);

        if (partitions == null) {
            log.warn("Reading non-line based formats by master node only, consider setting --parsing.repartition to redistribute work to other nodes.");
        }
    }
    if (partitions != null) {
        log.info("Distributing workload, repartitioning into {} partitions", partitions);
        quads = quads.repartition(partitions);
    }


    final List<String> acceptedLanguages = config.getAcceptedLanguages();
    // if only some languages are accepted
    if (!acceptedLanguages.isEmpty()) {
        // filter out literals of unsupported languages
        quads = quads.filter(quad ->
                !quad.getObject().isLiteral() ||
                        quad.getObject().getLiteralLanguage() == null ||
                        quad.getObject().getLiteralLanguage().isEmpty() ||
                        acceptedLanguages.contains(quad.getObject().getLiteralLanguage())
        );
    }

    return quads;
}
 
源代码13 项目: mrgeo   文件: DelimitedVectorInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
  boolean useNLineFormat = context.getConfiguration().getBoolean(USE_NLINE_FORMAT, false);
  if (useNLineFormat)
  {
    List<InputSplit> splits = new NLineInputFormat().getSplits(context);
    // This is a workaround to what appears to be a bug in in how NLineInputFormat
    // computes its splits. When there are multiple splits in a file, it seems
    // the start position in the last split is off by one. Note that this corrective
    // code needs to check the last split for each different file that appears
    // in the list of splits.
    for (int index = 2; index < splits.size(); index++)
    {
      FileSplit previousSplit = (FileSplit) splits.get(index - 1);
      FileSplit currSplit = (FileSplit) splits.get(index);
      // If this index is the last split, or we've moved on to splits from a different
      // file, then we need to adjust the last split for that file.
      int lastFileIndex = -1;
      if (index == splits.size() - 1)
      {
        lastFileIndex = index;
      }
      else if (!currSplit.getPath().equals(previousSplit.getPath()))
      {
        lastFileIndex = index - 1;
      }
      if (lastFileIndex >= 2)
      {
        FileSplit lastFileSplit = (FileSplit) splits.get(lastFileIndex);
        FileSplit priorSplit = (FileSplit) splits.get(lastFileIndex - 1);
        if (lastFileSplit.getPath().equals(priorSplit.getPath()))
        {
          if (priorSplit.getPath().equals(lastFileSplit.getPath()) &&
              priorSplit.getStart() + priorSplit.getLength() < lastFileSplit.getStart())
          {
            // Adjust the start of previous split
            FileSplit replacement = new FileSplit(lastFileSplit.getPath(),
                priorSplit.getStart() + priorSplit.getLength(),
                lastFileSplit.getLength() + 1,
                lastFileSplit.getLocations());
            log.info("Replacing split: " + lastFileSplit);
            log.info("  With split: " + replacement);
            splits.set(lastFileIndex, replacement);
          }
        }
      }
    }
    return splits;
  }
  else
  {
    return new TextInputFormat().getSplits(context);
  }
}
 
源代码14 项目: DataGenerator   文件: HDFSDistributor.java
private Job prepareJob() throws IOException {
        // Basic configuration

        configuration.setInt("mapreduce.input.lineinputformat.linespermap", 1);
        configuration.set("reportingHost", this.reportingHost);

        configuration.setBoolean("mapreduce.map.output.compress", true);
        configuration.setBoolean("mapred.compress.map.output", true);
        configuration.setBoolean("mapred.output.compress", true);
        configuration.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
        configuration.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);

        /*        configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
         configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", GzipCodec.class, CompressionCodec.class);
         configuration.setCompressMapOutput(true);
         */
//        configuration.set("mapreduce.output.fileoutputformat.compress", "true");
//        configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
//        configuration.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
//        Job ret = new Job(configuration);
        Job ret = org.apache.hadoop.mapreduce.Job.getInstance(configuration);
        ret.setJarByClass(HDFSDistributor.class);
        ret.setJobName("PATH Test Data Generation");

        // Mapper
        ret.setMapperClass(DataGeneratorMapper.class);

        // Reducer (none)
        ret.setNumReduceTasks(0);

        // Input
        ret.setInputFormatClass(NLineInputFormat.class);
        NLineInputFormat.addInputPath(ret, mapperInputFilePath);

        // Output
        // [BTR] Saw this used in an example w/NLineInputFormatter
        // but not sure what it actually does ...
//        LazyOutputFormat.setOutputFormatClass(ret, TextOutputFormat.class);
        FileOutputFormat.setOutputPath(ret, mapperOutputFilePath);
        //ret.getConfiguration().setBoolean("mapred.output.compress", false);

        return ret;
    }
 
源代码15 项目: ViraPipe   文件: DecompressInterleave.java
private static void splitFastq(FileStatus fst, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {

    //TODO: Handle also compressed files
    List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, new Configuration(), splitlen);

    JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);

    splitRDD.foreach( split ->  {

      FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
      writeFastqFile(fqreader, new Configuration(), splitDir + "/" + split.getPath().getName()+"_"+split.getStart() + ".fq");

     });
  }
 
源代码16 项目: imputationserver   文件: ImputationJob.java
@Override
public void setupJob(Job job) {

	NLineInputFormat.setNumLinesPerSplit(job, 1);

	job.setMapperClass(ImputationMapper.class);
	job.setInputFormatClass(NLineInputFormat.class);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);

	job.setOutputKeyClass(Text.class);
	job.setNumReduceTasks(0);

}
 
 同包方法