类org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: MutiWordcount.java
public static void main(String[] args) throws Exception {
	   	
  Configuration conf = new Configuration();
  
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  
  long maxInputSplitSize = Long.parseLong(otherArgs[otherArgs.length-1]);
  long minInputSplitSize = Long.parseLong(otherArgs[otherArgs.length-2]);
  
  //added by wei
  conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERNODE, minInputSplitSize);
  conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERRACK, minInputSplitSize);
  //----
  
 
  if (otherArgs.length < 2) {
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "mulword count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  //added by wei
  job.setInputFormatClass(CombineTextInputFormat.class);
  CombineTextInputFormat.setMaxInputSplitSize(job, maxInputSplitSize);
  //----
  
  for (int i = 0; i < otherArgs.length - 3; ++i) {
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  }
  FileOutputFormat.setOutputPath(job,
    new Path(otherArgs[otherArgs.length - 3]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码2 项目: 163-bigdate-note   文件: ParseLogJob_End.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob_End.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
//        job.setOutputFormatClass(LogOutputFormat.class);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        //添加输入和输出数据
        FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
        for (int i = 0; i < fileStatuses.length; i++) {
            MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
            String inputPath = fileStatuses[i].getPath().toString();
            String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
            System.out.println(dir_name);
        }

        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }

        return 0;
    }
 
源代码3 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码4 项目: 163-bigdate-note   文件: ParseLogJobAzkaban.java
@Override
public void run() throws Exception {
    //创建job
    Configuration config = getConf();
    //添加自定义配置
    config.addResource("mr.xml");
    Job job = Job.getInstance(config);
    //通过job设置一些参数
    job.setJarByClass(ParseLogJobAzkaban.class);
    job.setJobName("parselog");
    job.setMapperClass(LogMapper.class);
    job.setReducerClass(LogReducer.class);
    job.setMapOutputKeyClass(TextLongWritable.class);
    job.setGroupingComparatorClass(TextLongGroupComparator.class);
    job.setPartitionerClass(TextLongPartition.class);
    job.setMapOutputValueClass(LogWritable.class);
    job.setOutputValueClass(Text.class);

    //设置CombineFileInputFormat
    job.setInputFormatClass(CombineTextInputFormat.class);

    //添加分布式缓存
    job.addCacheFile(new URI(config.get("ip.file.path")));

    //设置OutputFormat
    job.setOutputFormatClass(LogOutputFormat.class);

    //添加输入和输出数据
    FileInputFormat.addInputPath(job, new Path(inputPath));
    Path outputPath_tmp = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, outputPath_tmp);

    //设置压缩类型

    FileSystem fs = FileSystem.get(config);
    if (fs.exists(outputPath_tmp)) {
        fs.delete(outputPath_tmp, true);
    }

    super.run();
}
 
源代码5 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码6 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码7 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码8 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
 类方法
 同包方法