下面列出了org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#setOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
LOG.info("Verify output dir: " + outputDir);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
setJobScannerConf(job);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
BytesWritable.class, BytesWritable.class, job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
TableMapReduceUtil.setScannerCaching(job, scannerCaching);
job.setReducerClass(VerifyReducer.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
FileOutputFormat.setOutputPath(job, outputDir);
assertTrue(job.waitForCompletion(true));
long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
assertEquals(0, numOutputRecords);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
long t1 = System.currentTimeMillis();
boolean re = job.waitForCompletion(true);
long t2 = System.currentTimeMillis();
System.out.println((float)(t2-t1)/1000);
if (re)
System.exit(0);
else
System.exit(1);
}
private static void runTestLazyOutput(Configuration conf, Path output,
int numReducers, boolean createLazily)
throws Exception {
Job job = Job.getInstance(conf, "Test-Lazy-Output");
FileInputFormat.setInputPaths(job, INPUT);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(TestMapReduceLazyOutput.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(numReducers);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
if (createLazily) {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
} else {
job.setOutputFormatClass(TextOutputFormat.class);
}
assertTrue(job.waitForCompletion(true));
}
public static void runJob(String[] input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(SimpleMovingAverage.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 1) {
System.err.println("Usage: ElementValueMatchTest configFile outputDir");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ElementValueMatchTest.class);
job.setInputFormatClass(ValueInputFormat.class);
job.setMapperClass(ElementValueMatchMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
ElementValueMatchFunction.class, ElementValueMatch.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(final String[] args) throws Exception {
final Configuration conf = getConf();
@SuppressWarnings("deprecation")
final Job job = new Job(conf, "AerospikeAggregateIntInput");
log.info("run starting on bin " + binName);
job.setJarByClass(AggregateIntInput.class);
job.setInputFormatClass(AerospikeInputFormat.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
// job.setCombinerClass(Reduce.class); // no combiner
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
int status = job.waitForCompletion(true) ? 0 : 1;
log.info("run finished, status=" + status);
return status;
}
public static boolean runCalcJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(CalcMapReduce.Map.class);
job.setReducerClass(CalcMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(CalcMapReduce.TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true);
}
/**
* Write the sequence file.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SequenceFileStockMapReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StockPriceWritable.class);
job.setInputFormatClass(
SequenceFileInputFormat.class); //<co id="ch03_comment_seqfile_mr1"/>
job.setOutputFormatClass(SequenceFileOutputFormat.class); //<co id="ch03_comment_seqfile_mr2"/>
SequenceFileOutputFormat.setCompressOutput(job, true); //<co id="ch03_comment_seqfile_mr3"/>
SequenceFileOutputFormat.setOutputCompressionType(job, //<co id="ch03_comment_seqfile_mr4"/>
SequenceFile.CompressionType.BLOCK);
SequenceFileOutputFormat.setOutputCompressorClass(job, //<co id="ch03_comment_seqfile_mr5"/>
DefaultCodec.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "MergeMultipleFiles"; // 作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
job.setJar("export\\MergeMultipleFiles.jar"); // 指定本地jar包
job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
job.setMapOutputValueClass(BytesWritable.class); // 设置Mapper输出Value类型
job.setMapperClass(MergeMapper.class);
// 输入数据格式
job.setInputFormatClass(MyInputFormat.class);
// 以文件格式输出,使用序列化文件输出类
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 设置作业输出路径
String inputDir = "/workspace/mergeFiles/data";
String outputDir = "/workspace/mergeFiles/output"; // 输出目录
Path outPath = new Path(hdfs + outputDir);
Path inputPath = new Path(hdfs+inputDir);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MultInput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultInput.class); //指定运行时作业类
job.setJar("export\\MultInput.jar"); //指定本地jar包
job.setMapperClass(MultInputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MultInputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
//方法一:FileInputFormat.addInputPath()
FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"));//输入目录1
FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt2"));//输入目录2
//方法二:FileInputFormat.addInputPaths()
//FileInputFormat.addInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2"));
//方法三:FileInputFormat.setInputPaths()
//FileInputFormat.setInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2") );
Path outPath = new Path(hdfs + "/expr/multinput/output"); //输出目录
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Override
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('t');
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardDelete " + source);
if (cmd.hasOption('s')) {
job.getConfiguration().set(SUBJECT, cmd.getOptionValue('s'));
}
if (cmd.hasOption('p')) {
job.getConfiguration().set(PREDICATE, cmd.getOptionValue('p'));
}
if (cmd.hasOption('o')) {
job.getConfiguration().set(OBJECT, cmd.getOptionValue('o'));
}
if (cmd.hasOption('g')) {
job.getConfiguration().setStrings(CONTEXTS, cmd.getOptionValues('g'));
}
job.setJarByClass(HalyardBulkDelete.class);
TableMapReduceUtil.initCredentials(job);
Scan scan = HalyardTableUtils.scan(null, null);
TableMapReduceUtil.initTableMapperJob(source,
scan,
DeleteMapper.class,
ImmutableBytesWritable.class,
LongWritable.class,
job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue('f')));
TableMapReduceUtil.addDependencyJars(job);
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(cmd.getOptionValue('f')), hTable);
LOG.info("Bulk Delete Completed..");
return 0;
}
}
return -1;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
parseOptions(options, args);
Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(Locale.ROOT);
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
CubeInstance cube = cubeMgr.getCube(cubeName);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
setJobClasspath(job, cube.getConfig());
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setMapperClass(ColumnToRowMapper.class);
job.setInputFormatClass(ColumnarSplitDataInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ColumnToRowReducer.class);
job.setNumReduceTasks(calReducerNum(input));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.getConfiguration().set("dfs.block.size", cube.getConfig().getStreamingBasicCuboidJobDFSBlockSize());
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
attachSegmentMetadataWithDict(segment, job.getConfiguration());
this.deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in CuboidJob", e);
printUsage(options);
throw e;
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "Missed"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(Missed.class); //指定运行时作业类
job.setJar("export\\Missed.jar"); //指定本地jar包
job.setMapperClass(MissedMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(NullWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MissedReducer.class); //指定Reducer类
//定义多文件输出的文件名、输出格式、键类型、值类型
MultipleOutputs.addNamedOutput(job, "missed", TextOutputFormat.class, Text.class, NullWritable.class);
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MedianStdDevJob"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MedianStdDevJob.class); //指定运行时作业类
job.setJar("export\\MedianStdDevJob.jar"); //指定本地jar包
job.setMapperClass(MedianStdDevMapper.class); //指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MedianStdDevReducer.class); //指定Reducer类
job.setOutputKeyClass(IntWritable.class); //设置Reduce输出Key类型
job.setOutputValueClass(MedianStdDevTuple.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/medianstddev/data"; //实验数据目录
String outputDir = "/expr/medianstddev/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String segmentId = getOptionValue(OPTION_SEGMENT_ID);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getRootFactTable());
String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
String topic = kafkaConfig.getTopic();
if (brokers == null || brokers.length() == 0 || topic == null) {
throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic);
}
JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
job.getConfiguration().addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
KafkaConsumerProperties kafkaConsumerProperties = KafkaConsumerProperties.getInstanceFromEnv();
job.getConfiguration().addResource(new Path(kafkaConsumerProperties.getKafkaConsumerHadoopJobConf()));
job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers);
job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic);
job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
appendKafkaOverrideProperties(cube.getConfig(), job.getConfiguration());
setupMapper(cube.getSegmentById(segmentId));
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output);
org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
attachCubeMetadata(cube, job.getConfiguration());
deletePath(job.getConfiguration(), output);
return waitForCompletion(job);
} catch (Exception e) {
logger.error("error in KafkaFlatTableJob", e);
printUsage(options);
throw e;
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimatePi(int numMaps, long numPoints,
Path tmpDir, Configuration conf
) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(conf);
//setup job conf
job.setJobName(QuasiMonteCarlo.class.getSimpleName());
job.setJarByClass(QuasiMonteCarlo.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(QmcMapper.class);
job.setReducerClass(QmcReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(tmpDir, "in");
final Path outDir = new Path(tmpDir, "out");
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(conf);
if (fs.exists(tmpDir)) {
throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
final BigDecimal numTotal
= BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints));
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(numTotal, RoundingMode.HALF_UP);
} finally {
fs.delete(tmpDir, true);
}
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(CloneReduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(PersonBinaryComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
@SuppressWarnings("deprecation")
public static long run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(total_records_driver.class);
job.setJobName("Just counting total rows of the HDFS input");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(total_records_mapper.class);
job.setReducerClass(total_records_reducer.class);
job.setCombinerClass(total_records_reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
return readTotalRecords(args[1], conf);
}