下面列出了org.apache.hadoop.mapreduce.Job#getJobName ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(GetLinesUsersJob.class);
job.setJobName("getLinesAndUser");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置CombineFileInputFormat
// job.setInputFormatClass(CombineTextInputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
private void waitForJob(Job job) throws InterruptedException, IOException {
while (!job.isComplete()) {
System.out.println("waiting for job " + job.getJobName());
sleep(100);
}
System.out.println("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
static void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(50);
}
LOG.debug("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
// job.setNumReduceTasks(0);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogBeanWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(GetLinesUsersJob.class);
job.setJobName("getLinesAndUser");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置CombineFileInputFormat
// job.setInputFormatClass(CombineTextInputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
private void waitForJob(Job job) throws InterruptedException, IOException {
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
sleep(100);
}
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
public static void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
protected boolean writeStats(Logger log, Job job, JobID jobId, Counters counters, long start, long stop, boolean outputMutations, FileSystem fs,
Path statsDir, String metricsLabelOverride) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
// We are going to serialize the counters into a file in HDFS.
// The context was set in the processKeyValues method below, and should not be null. We'll guard against NPE anyway
try (RawLocalFileSystem rawFS = new RawLocalFileSystem()) {
rawFS.setConf(conf);
CompressionCodec cc = new GzipCodec();
CompressionType ct = CompressionType.BLOCK;
// Add additional counters
if (!outputMutations) {
counters.findCounter(IngestProcess.OUTPUT_DIRECTORY.name(), job.getWorkingDirectory().getName()).increment(1);
} else {
counters.findCounter(IngestProcess.LIVE_INGEST).increment(1);
}
counters.findCounter(IngestProcess.START_TIME).increment(start);
counters.findCounter(IngestProcess.END_TIME).increment(stop);
if (metricsLabelOverride != null) {
counters.getGroup(IngestProcess.METRICS_LABEL_OVERRIDE.name()).findCounter(metricsLabelOverride).increment(1);
}
// Serialize the counters to a file in HDFS.
Path src = new Path("/tmp" + File.separator + job.getJobName() + ".metrics");
src = rawFS.makeQualified(src);
createFileWithRetries(rawFS, src);
try (Writer writer = SequenceFile.createWriter(conf, Writer.file(src), Writer.keyClass(Text.class), Writer.valueClass(Counters.class),
Writer.compression(ct, cc))) {
writer.append(new Text(jobId.toString()), counters);
}
// Now we will try to move the file to HDFS.
// Copy the file to the temp dir
try {
if (!fs.exists(statsDir))
fs.mkdirs(statsDir);
Path dst = new Path(statsDir, src.getName());
log.info("Copying file " + src + " to " + dst);
fs.copyFromLocalFile(false, true, src, dst);
// If this worked, then remove the local file
rawFS.delete(src, false);
// also remove the residual crc file
rawFS.delete(getCrcFile(src), false);
} catch (IOException e) {
// If an error occurs in the copy, then we will leave in the local metrics directory.
log.error("Error copying metrics file into HDFS, will remain in metrics directory.", e);
return false;
}
}
// now if configured, lets write the stats out to statsd
CounterToStatsDConfiguration statsDConfig = new CounterToStatsDConfiguration(conf);
if (statsDConfig.isConfigured()) {
log.info("Sending final counters via statsd: " + statsDConfig);
CounterStatsDClient statsd = statsDConfig.getClient();
try {
statsd.sendFinalStats(counters);
} finally {
statsd.close();
}
}
return true;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob_End.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
// job.setOutputFormatClass(LogOutputFormat.class);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
//添加输入和输出数据
FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
for (int i = 0; i < fileStatuses.length; i++) {
MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
String inputPath = fileStatuses[i].getPath().toString();
String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
System.out.println(dir_name);
}
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
private static String getJobInfo(Job job) {
return "jobName: " + job.getJobName() + ", jobId: " + job.getJobID();
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
// config.set("ip.file.path", args[2]);
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
private String getJobInfo(Job job) {
return "jobName: " + job.getJobName() + ", jobId: " + job.getJobID();
}