org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#setOutputPath ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#setOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: MapReduceTestUtil.java
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  FileSystem fs = outdir.getFileSystem(conf);
  if (fs.exists(outdir)) {
    fs.delete(outdir, true);
  }
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
  Job theJob = Job.getInstance(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
 
源代码2 项目: hbase   文件: IntegrationTestLoadAndVerify.java
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
  Path outputDir = getTestDir(TEST_NAME, "verify-output");
  LOG.info("Verify output dir: " + outputDir);

  Job job = Job.getInstance(conf);
  job.setJarByClass(this.getClass());
  job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
  setJobScannerConf(job);

  Scan scan = new Scan();

  TableMapReduceUtil.initTableMapperJob(
      tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
      BytesWritable.class, BytesWritable.class, job);
  TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
  int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
  TableMapReduceUtil.setScannerCaching(job, scannerCaching);

  job.setReducerClass(VerifyReducer.class);
  job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
  FileOutputFormat.setOutputPath(job, outputDir);
  assertTrue(job.waitForCompletion(true));

  long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
  assertEquals(0, numOutputRecords);
}
 
源代码3 项目: RDFS   文件: WordCount.java
public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: wordcount <in> <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  long t1 = System.currentTimeMillis();
  boolean re = job.waitForCompletion(true); 
  long t2 = System.currentTimeMillis();
  System.out.println((float)(t2-t1)/1000);
  if (re)
    System.exit(0);
  else
    System.exit(1);
}
 
源代码4 项目: hadoop   文件: TestMapReduceLazyOutput.java
private static void runTestLazyOutput(Configuration conf, Path output,
    int numReducers, boolean createLazily) 
throws Exception {
  Job job = Job.getInstance(conf, "Test-Lazy-Output");

  FileInputFormat.setInputPaths(job, INPUT);
  FileOutputFormat.setOutputPath(job, output);

  job.setJarByClass(TestMapReduceLazyOutput.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(numReducers);

  job.setMapperClass(TestMapper.class);
  job.setReducerClass(TestReducer.class);

  if (createLazily) {
    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  } else {
    job.setOutputFormatClass(TextOutputFormat.class);
  }
  assertTrue(job.waitForCompletion(true));
}
 
源代码5 项目: hiped2   文件: SimpleMovingAverage.java
public static void runJob(String[] input, String output)
    throws Exception {
  Configuration conf = new Configuration();

  Job job = new Job(conf);
  job.setJarByClass(SimpleMovingAverage.class);
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  Path outputPath = new Path(output);

  FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
  FileOutputFormat.setOutputPath(job, outputPath);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  job.waitForCompletion(true);
}
 
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 1) {
        System.err.println("Usage: ElementValueMatchTest configFile outputDir");
        System.exit(2);
    }

    Job job = Job.getInstance(conf);
    job.setJarByClass(ElementValueMatchTest.class);
    job.setInputFormatClass(ValueInputFormat.class);
    job.setMapperClass(ElementValueMatchMapper.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    conf = job.getConfiguration();
    conf.addResource(otherArgs[0]);
    conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class, 
            Writable.class);
    conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS, 
        ElementValueMatchFunction.class, ElementValueMatch.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码7 项目: aerospike-hadoop   文件: AggregateIntInput.java
public int run(final String[] args) throws Exception {
    final Configuration conf = getConf();

    @SuppressWarnings("deprecation")
        final Job job = new Job(conf, "AerospikeAggregateIntInput");

    log.info("run starting on bin " + binName);

    job.setJarByClass(AggregateIntInput.class);
    job.setInputFormatClass(AerospikeInputFormat.class);
    job.setMapperClass(Map.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(LongWritable.class);
    // job.setCombinerClass(Reduce.class); // no combiner
    job.setReducerClass(Reduce.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);

    FileOutputFormat.setOutputPath(job, new Path(args[0]));

    int status = job.waitForCompletion(true) ? 0 : 1;
    log.info("run finished, status=" + status);
    return status;
}
 
源代码8 项目: hiped2   文件: Main.java
public static boolean runCalcJob(Configuration conf, Path input, Path outputPath)
    throws Exception {

  Job job = new Job(conf);
  job.setJarByClass(Main.class);
  job.setMapperClass(CalcMapReduce.Map.class);
  job.setReducerClass(CalcMapReduce.Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(CalcMapReduce.TextPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true);
}
 
源代码9 项目: hiped2   文件: SequenceFileStockMapReduce.java
/**
 * Write the sequence file.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
  Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(SequenceFileStockMapReduce.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(StockPriceWritable.class);
  job.setInputFormatClass(
      SequenceFileInputFormat.class); //<co id="ch03_comment_seqfile_mr1"/>
  job.setOutputFormatClass(SequenceFileOutputFormat.class);  //<co id="ch03_comment_seqfile_mr2"/>
  SequenceFileOutputFormat.setCompressOutput(job, true);  //<co id="ch03_comment_seqfile_mr3"/>
  SequenceFileOutputFormat.setOutputCompressionType(job,  //<co id="ch03_comment_seqfile_mr4"/>
      SequenceFile.CompressionType.BLOCK);
  SequenceFileOutputFormat.setOutputCompressorClass(job,  //<co id="ch03_comment_seqfile_mr5"/>
      DefaultCodec.class);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
源代码10 项目: MapReduce-Demo   文件: MergeJob.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 2.设置MapReduce作业配置信息
	String jobName = "MergeMultipleFiles"; // 作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
	job.setJar("export\\MergeMultipleFiles.jar"); // 指定本地jar包
	job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
	job.setMapOutputValueClass(BytesWritable.class); // 设置Mapper输出Value类型
	job.setMapperClass(MergeMapper.class);
	// 输入数据格式
	job.setInputFormatClass(MyInputFormat.class);
	// 以文件格式输出,使用序列化文件输出类
	job.setOutputFormatClass(SequenceFileOutputFormat.class);

	// 设置作业输出路径
	String inputDir = "/workspace/mergeFiles/data";
	String outputDir = "/workspace/mergeFiles/output"; // 输出目录
	Path outPath = new Path(hdfs + outputDir);
	Path inputPath = new Path(hdfs+inputDir);
	FileInputFormat.setInputPaths(job, inputPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	// 运行作业
	System.out.println("Job: " + jobName + " is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码11 项目: MapReduce-Demo   文件: MultInput.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MultInput";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultInput.class);				//指定运行时作业类
	job.setJar("export\\MultInput.jar");			//指定本地jar包
	job.setMapperClass(MultInputMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(MultInputReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径	
	//方法一:FileInputFormat.addInputPath()
	FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"));//输入目录1
	FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt2"));//输入目录2
	
	//方法二:FileInputFormat.addInputPaths()
	//FileInputFormat.addInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2"));
	
	//方法三:FileInputFormat.setInputPaths()
	//FileInputFormat.setInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2") );
	
	Path outPath = new Path(hdfs + "/expr/multinput/output");		//输出目录
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码12 项目: Halyard   文件: HalyardBulkDelete.java
@Override
public int run(CommandLine cmd) throws Exception {
    String source = cmd.getOptionValue('t');
    TableMapReduceUtil.addDependencyJars(getConf(),
        HalyardExport.class,
        NTriplesUtil.class,
        Rio.class,
        AbstractRDFHandler.class,
        RDFFormat.class,
        RDFParser.class,
        HTable.class,
        HBaseConfiguration.class,
        AuthenticationProtos.class,
        Trace.class,
        Gauge.class);
    HBaseConfiguration.addHbaseResources(getConf());
    Job job = Job.getInstance(getConf(), "HalyardDelete " + source);
    if (cmd.hasOption('s')) {
        job.getConfiguration().set(SUBJECT, cmd.getOptionValue('s'));
    }
    if (cmd.hasOption('p')) {
        job.getConfiguration().set(PREDICATE, cmd.getOptionValue('p'));
    }
    if (cmd.hasOption('o')) {
        job.getConfiguration().set(OBJECT, cmd.getOptionValue('o'));
    }
    if (cmd.hasOption('g')) {
        job.getConfiguration().setStrings(CONTEXTS, cmd.getOptionValues('g'));
    }
    job.setJarByClass(HalyardBulkDelete.class);
    TableMapReduceUtil.initCredentials(job);

    Scan scan = HalyardTableUtils.scan(null, null);

    TableMapReduceUtil.initTableMapperJob(source,
        scan,
        DeleteMapper.class,
        ImmutableBytesWritable.class,
        LongWritable.class,
        job);

    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    job.setSpeculativeExecution(false);
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);
    try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
        HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
        FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue('f')));
        TableMapReduceUtil.addDependencyJars(job);
        if (job.waitForCompletion(true)) {
            new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(cmd.getOptionValue('f')), hTable);
            LOG.info("Bulk Delete Completed..");
            return 0;
        }
    }
    return -1;
}
 
源代码13 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码14 项目: kylin-on-parquet-v2   文件: ColumnToRowJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    try {
        options.addOption(OPTION_JOB_NAME);
        options.addOption(OPTION_CUBE_NAME);
        options.addOption(OPTION_SEGMENT_NAME);
        options.addOption(OPTION_INPUT_PATH);
        options.addOption(OPTION_OUTPUT_PATH);

        parseOptions(options, args);
        Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
        Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
        String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(Locale.ROOT);
        String segmentName = getOptionValue(OPTION_SEGMENT_NAME);

        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
        CubeInstance cube = cubeMgr.getCube(cubeName);

        job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
        setJobClasspath(job, cube.getConfig());
        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        job.setMapperClass(ColumnToRowMapper.class);
        job.setInputFormatClass(ColumnarSplitDataInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ColumnToRowReducer.class);
        job.setNumReduceTasks(calReducerNum(input));
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.getConfiguration().set("dfs.block.size", cube.getConfig().getStreamingBasicCuboidJobDFSBlockSize());
        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
        job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);

        CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
        attachSegmentMetadataWithDict(segment, job.getConfiguration());
        this.deletePath(job.getConfiguration(), output);

        return waitForCompletion(job);
    } catch (Exception e) {
        logger.error("error in CuboidJob", e);
        printUsage(options);
        throw e;
    } finally {
        if (job != null)
            cleanupTempConfFile(job.getConfiguration());
    }
}
 
源代码15 项目: MapReduce-Demo   文件: Missed.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "Missed";						//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(Missed.class);				//指定运行时作业类
	job.setJar("export\\Missed.jar");				//指定本地jar包
	job.setMapperClass(MissedMapper.class);			//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(NullWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(MissedReducer.class);		//指定Reducer类		
	//定义多文件输出的文件名、输出格式、键类型、值类型
	MultipleOutputs.addNamedOutput(job, "missed", TextOutputFormat.class, Text.class, NullWritable.class);
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output2";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码16 项目: MapReduce-Demo   文件: MedianStdDevJob.java
public static void main(String[] args) throws Exception {
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MedianStdDevJob";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MedianStdDevJob.class);			//指定运行时作业类
	job.setJar("export\\MedianStdDevJob.jar");			//指定本地jar包
	job.setMapperClass(MedianStdDevMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(IntWritable.class);		//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(MedianStdDevReducer.class);		//指定Reducer类
	job.setOutputKeyClass(IntWritable.class);			//设置Reduce输出Key类型
	job.setOutputValueClass(MedianStdDevTuple.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/medianstddev/data";			//实验数据目录	
	String outputDir = "/expr/medianstddev/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码17 项目: kylin-on-parquet-v2   文件: KafkaFlatTableJob.java
@Override
public int run(String[] args) throws Exception {
    Options options = new Options();

    try {
        options.addOption(OPTION_JOB_NAME);
        options.addOption(OPTION_CUBE_NAME);
        options.addOption(OPTION_OUTPUT_PATH);
        options.addOption(OPTION_SEGMENT_ID);
        parseOptions(options, args);

        job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
        String cubeName = getOptionValue(OPTION_CUBE_NAME);
        Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));

        String segmentId = getOptionValue(OPTION_SEGMENT_ID);

        // ----------------------------------------------------------------------------
        // add metadata to distributed cache
        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        CubeInstance cube = cubeMgr.getCube(cubeName);

        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
        job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
        logger.info("Starting: " + job.getJobName());

        setJobClasspath(job, cube.getConfig());

        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable());
        String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
        String topic = kafkaConfig.getTopic();

        if (brokers == null || brokers.length() == 0 || topic == null) {
            throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
        }

        JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
        job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
        KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
        job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
        job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
        job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
        job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
        job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
        job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
        job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
        job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
        appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
        setupMapper(cube.getSegmentById(segmentId));
        job.setNumReduceTasks(0);
        FileOutputFormat.setOutputPath(job, output);
        FileOutputFormat.setCompressOutput(job, true);
        org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
        org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());

        attachCubeMetadata(cube, job.getConfiguration());
        deletePath(job.getConfiguration(), output);
        return waitForCompletion(job);

    } catch (Exception e) {
        logger.error("error in KafkaFlatTableJob", e);
        printUsage(options);
        throw e;
    } finally {
        if (job != null)
            cleanupTempConfFile(job.getConfiguration());
    }

}
 
源代码18 项目: hadoop   文件: QuasiMonteCarlo.java
/**
 * Run a map/reduce job for estimating Pi.
 *
 * @return the estimated value of Pi
 */
public static BigDecimal estimatePi(int numMaps, long numPoints,
    Path tmpDir, Configuration conf
    ) throws IOException, ClassNotFoundException, InterruptedException {
  Job job = Job.getInstance(conf);
  //setup job conf
  job.setJobName(QuasiMonteCarlo.class.getSimpleName());
  job.setJarByClass(QuasiMonteCarlo.class);

  job.setInputFormatClass(SequenceFileInputFormat.class);

  job.setOutputKeyClass(BooleanWritable.class);
  job.setOutputValueClass(LongWritable.class);
  job.setOutputFormatClass(SequenceFileOutputFormat.class);

  job.setMapperClass(QmcMapper.class);

  job.setReducerClass(QmcReducer.class);
  job.setNumReduceTasks(1);

  // turn off speculative execution, because DFS doesn't handle
  // multiple writers to the same file.
  job.setSpeculativeExecution(false);

  //setup input/output directories
  final Path inDir = new Path(tmpDir, "in");
  final Path outDir = new Path(tmpDir, "out");
  FileInputFormat.setInputPaths(job, inDir);
  FileOutputFormat.setOutputPath(job, outDir);

  final FileSystem fs = FileSystem.get(conf);
  if (fs.exists(tmpDir)) {
    throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
        + " already exists.  Please remove it first.");
  }
  if (!fs.mkdirs(inDir)) {
    throw new IOException("Cannot create input directory " + inDir);
  }

  try {
    //generate an input file for each map task
    for(int i=0; i < numMaps; ++i) {
      final Path file = new Path(inDir, "part"+i);
      final LongWritable offset = new LongWritable(i * numPoints);
      final LongWritable size = new LongWritable(numPoints);
      final SequenceFile.Writer writer = SequenceFile.createWriter(
          fs, conf, file,
          LongWritable.class, LongWritable.class, CompressionType.NONE);
      try {
        writer.append(offset, size);
      } finally {
        writer.close();
      }
      System.out.println("Wrote input for Map #"+i);
    }

    //start a map/reduce job
    System.out.println("Starting Job");
    final long startTime = System.currentTimeMillis();
    job.waitForCompletion(true);
    final double duration = (System.currentTimeMillis() - startTime)/1000.0;
    System.out.println("Job Finished in " + duration + " seconds");

    //read outputs
    Path inFile = new Path(outDir, "reduce-out");
    LongWritable numInside = new LongWritable();
    LongWritable numOutside = new LongWritable();
    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
    try {
      reader.next(numInside, numOutside);
    } finally {
      reader.close();
    }

    //compute estimated value
    final BigDecimal numTotal
        = BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints));
    return BigDecimal.valueOf(4).setScale(20)
        .multiply(BigDecimal.valueOf(numInside.get()))
        .divide(numTotal, RoundingMode.HALF_UP);
  } finally {
    fs.delete(tmpDir, true);
  }
}
 
源代码19 项目: hiped2   文件: CloneReduce.java
public static void runSortJob(String input, String output)
    throws Exception {
  Configuration conf = new Configuration();

  Job job = new Job(conf);
  job.setJarByClass(CloneReduce.class);

  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(Person.class);
  job.setMapOutputValueClass(Text.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setSortComparatorClass(PersonBinaryComparator.class);

  Path outputPath = new Path(output);

  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, outputPath);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  job.waitForCompletion(true);
}
 
源代码20 项目: MLHadoop   文件: total_records_driver.java
@SuppressWarnings("deprecation")
public static long run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  Configuration conf = new Configuration();
  Job job = new Job(conf);

  job.setJarByClass(total_records_driver.class);

  job.setJobName("Just counting total rows of the HDFS input");

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(total_records_mapper.class);

  job.setReducerClass(total_records_reducer.class);
  job.setCombinerClass(total_records_reducer.class);

  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(LongWritable.class);
  
  //job.setInputFormatClass(TextInputFormat.class);
     //job.setOutputFormatClass(TextOutputFormat.class);

  job.waitForCompletion(true);
  
  return readTotalRecords(args[1], conf);
 }