下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.NLineInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
Path fqpath = new Path(fqPath);
String fqname = fqpath.getName();
String[] ns = fqname.split("\\.");
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
splitRDD.foreach( split -> {
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);
});
}
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
String[] ns = fst.getPath().getName().split("\\.");
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
public static void setupJob(Job job, int minFeaturesPerSplit, long featureCount)
{
if (minFeaturesPerSplit > 0)
{
if (featureCount < 0)
{
throw new IllegalArgumentException("Expected a feature count");
}
int maxMapTasks = job.getConfiguration().getInt("mapred.tasktracker.map.tasks.maximum", -1);
if (maxMapTasks > 0)
{
int featuresPerSplit = (int) (featureCount / maxMapTasks);
if (featuresPerSplit < minFeaturesPerSplit)
{
featuresPerSplit = minFeaturesPerSplit;
}
job.getConfiguration().setBoolean(USE_NLINE_FORMAT, true);
NLineInputFormat.setNumLinesPerSplit(job, featuresPerSplit);
}
}
}
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
Path fqpath = new Path(fqPath);
String fqname = fqpath.getName();
String[] ns = fqname.split("\\.");
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
splitRDD.foreach( split -> {
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);
});
}
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
Path fqpath = new Path(fqPath);
String fqname = fqpath.getName();
String[] ns = fqname.split("\\.");
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
splitRDD.foreach( split -> {
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);
});
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step4LocalDeDuplication.class);
job.setJobName(Phase3Step4LocalDeDuplication.class.getName());
// paths
String inputPath = args[0];
// text files of ids to be deleted
String outputPath = args[1];
// input: reading max N lines for each mapper
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(inputPath));
job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", LINES);
// mapper
job.setMapperClass(LocalGreedyDeDuplicationMapper.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// reducer
job.setReducerClass(IDCollectorReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.setInt("mapreduce.input.lineinputformat.linespermap", 1000); //设置每个Map处理的行数
//2.设置MapReduce作业配置信息
String jobName = "NLineInput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(NLineInput.class); //指定运行时作业类
job.setJar("export\\NLineInput.jar"); //指定本地jar包
job.setMapperClass(NLineInputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(NLineInputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
job.setInputFormatClass(NLineInputFormat.class); //设置输入格式化类
//3.设置作业输入和输出路径
String dataDir = "/expr/nlineinput/data"; //实验数据目录
String outputDir = "/expr/nlineinput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
private static JavaPairRDD<Text, SequencedFragment> interleaveReads(String fastq, String fastq2, int splitlen, JavaSparkContext sc) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
FileStatus fst = fs.getFileStatus(new Path(fastq));
FileStatus fst2 = fs.getFileStatus(new Path(fastq2));
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
return zips.flatMapToPair( splits -> {
FastqInputFormat.FastqRecordReader fqreader = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._1);
FastqInputFormat.FastqRecordReader fqreader2 = new FastqInputFormat.FastqRecordReader(new Configuration(), splits._2);
ArrayList<Tuple2<Text, SequencedFragment>> reads = new ArrayList<Tuple2<Text, SequencedFragment>>();
while (fqreader.nextKeyValue()) {
String key = fqreader.getCurrentKey().toString();
String[] keysplit = key.split(" ");
key = keysplit[0];
SequencedFragment sf = new SequencedFragment();
sf.setQuality(new Text(fqreader.getCurrentValue().getQuality().toString()));
sf.setSequence(new Text(fqreader.getCurrentValue().getSequence().toString()));
if (fqreader2.nextKeyValue()) {
String key2 = fqreader2.getCurrentKey().toString();
String[] keysplit2 = key2.split(" ");
key2 = keysplit2[0];
//key2 = key2.replace(" 2:N:0:1","/2");
SequencedFragment sf2 = new SequencedFragment();
sf2.setQuality(new Text(fqreader2.getCurrentValue().getQuality().toString()));
sf2.setSequence(new Text(fqreader2.getCurrentValue().getSequence().toString()));
reads.add(new Tuple2<Text, SequencedFragment>(new Text(key), sf));
reads.add(new Tuple2<Text, SequencedFragment>(new Text(key2), sf2));
}
}
return reads.iterator();
});
}
@Override
public JavaRDD<Quad> parseQuads(String path) {
Configuration conf = new Configuration();
Integer batchSize = config.getBatchSize();
conf.set(NLineInputFormat.LINES_PER_MAP, batchSize.toString());
if (config.getErrorHandling() == ParseErrorHandling.Throw) {
conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "false");
} else {
conf.set(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, "true");
}
Boolean isLineBased = config.getLineBasedFormat();
if (isLineBased == null) {
isLineBased = guessIsLineBasedFormat(path);
}
JavaRDD<Quad> quads;
Integer partitions = config.getRepartition();
if (isLineBased) {
log.info("Parsing RDF in parallel with batch size: {}", batchSize);
quads = sc.newAPIHadoopFile(path,
NQuadsInputFormat.class,
LongWritable.class, // position
QuadWritable.class, // value
conf).values().map(QuadWritable::get);
} else {
// let Jena guess the format, load whole files
log.info("Input format is not line based, parsing RDF by Master node only.");
quads = sc.newAPIHadoopFile(path,
TriplesOrQuadsInputFormat.class,
LongWritable.class, // position
QuadWritable.class, // value
conf).values().map(QuadWritable::get);
if (partitions == null) {
log.warn("Reading non-line based formats by master node only, consider setting --parsing.repartition to redistribute work to other nodes.");
}
}
if (partitions != null) {
log.info("Distributing workload, repartitioning into {} partitions", partitions);
quads = quads.repartition(partitions);
}
final List<String> acceptedLanguages = config.getAcceptedLanguages();
// if only some languages are accepted
if (!acceptedLanguages.isEmpty()) {
// filter out literals of unsupported languages
quads = quads.filter(quad ->
!quad.getObject().isLiteral() ||
quad.getObject().getLiteralLanguage() == null ||
quad.getObject().getLiteralLanguage().isEmpty() ||
acceptedLanguages.contains(quad.getObject().getLiteralLanguage())
);
}
return quads;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
boolean useNLineFormat = context.getConfiguration().getBoolean(USE_NLINE_FORMAT, false);
if (useNLineFormat)
{
List<InputSplit> splits = new NLineInputFormat().getSplits(context);
// This is a workaround to what appears to be a bug in in how NLineInputFormat
// computes its splits. When there are multiple splits in a file, it seems
// the start position in the last split is off by one. Note that this corrective
// code needs to check the last split for each different file that appears
// in the list of splits.
for (int index = 2; index < splits.size(); index++)
{
FileSplit previousSplit = (FileSplit) splits.get(index - 1);
FileSplit currSplit = (FileSplit) splits.get(index);
// If this index is the last split, or we've moved on to splits from a different
// file, then we need to adjust the last split for that file.
int lastFileIndex = -1;
if (index == splits.size() - 1)
{
lastFileIndex = index;
}
else if (!currSplit.getPath().equals(previousSplit.getPath()))
{
lastFileIndex = index - 1;
}
if (lastFileIndex >= 2)
{
FileSplit lastFileSplit = (FileSplit) splits.get(lastFileIndex);
FileSplit priorSplit = (FileSplit) splits.get(lastFileIndex - 1);
if (lastFileSplit.getPath().equals(priorSplit.getPath()))
{
if (priorSplit.getPath().equals(lastFileSplit.getPath()) &&
priorSplit.getStart() + priorSplit.getLength() < lastFileSplit.getStart())
{
// Adjust the start of previous split
FileSplit replacement = new FileSplit(lastFileSplit.getPath(),
priorSplit.getStart() + priorSplit.getLength(),
lastFileSplit.getLength() + 1,
lastFileSplit.getLocations());
log.info("Replacing split: " + lastFileSplit);
log.info(" With split: " + replacement);
splits.set(lastFileIndex, replacement);
}
}
}
}
return splits;
}
else
{
return new TextInputFormat().getSplits(context);
}
}
private Job prepareJob() throws IOException {
// Basic configuration
configuration.setInt("mapreduce.input.lineinputformat.linespermap", 1);
configuration.set("reportingHost", this.reportingHost);
configuration.setBoolean("mapreduce.map.output.compress", true);
configuration.setBoolean("mapred.compress.map.output", true);
configuration.setBoolean("mapred.output.compress", true);
configuration.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
configuration.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
/* configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", GzipCodec.class, CompressionCodec.class);
configuration.setCompressMapOutput(true);
*/
// configuration.set("mapreduce.output.fileoutputformat.compress", "true");
// configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
// configuration.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
// Job ret = new Job(configuration);
Job ret = org.apache.hadoop.mapreduce.Job.getInstance(configuration);
ret.setJarByClass(HDFSDistributor.class);
ret.setJobName("PATH Test Data Generation");
// Mapper
ret.setMapperClass(DataGeneratorMapper.class);
// Reducer (none)
ret.setNumReduceTasks(0);
// Input
ret.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(ret, mapperInputFilePath);
// Output
// [BTR] Saw this used in an example w/NLineInputFormatter
// but not sure what it actually does ...
// LazyOutputFormat.setOutputFormatClass(ret, TextOutputFormat.class);
FileOutputFormat.setOutputPath(ret, mapperOutputFilePath);
//ret.getConfiguration().setBoolean("mapred.output.compress", false);
return ret;
}
private static void splitFastq(FileStatus fst, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, new Configuration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
splitRDD.foreach( split -> {
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
writeFastqFile(fqreader, new Configuration(), splitDir + "/" + split.getPath().getName()+"_"+split.getStart() + ".fq");
});
}
@Override
public void setupJob(Job job) {
NLineInputFormat.setNumLinesPerSplit(job, 1);
job.setMapperClass(ImputationMapper.class);
job.setInputFormatClass(NLineInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setNumReduceTasks(0);
}