下面列出了org.apache.hadoop.mapreduce.Job#setJar ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static boolean run(Configuration config, Map<String, String> paths)
throws IOException, ClassNotFoundException, InterruptedException {
String jobName = "step2";
Job job = Job.getInstance(config, jobName);
job.setJarByClass(Step2.class);
job.setJar("export\\ItemCF.jar");
job.setMapperClass(Step2_Mapper.class);
job.setReducerClass(Step2_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path inPath = new Path(paths.get("Step2Input"));
Path outpath = new Path(paths.get("Step2Output"));
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outpath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
return job.waitForCompletion(true);
}
@Override
public void configure(Job job) throws IOException {
Configuration conf = job.getConfiguration();
FileSystem localFS = FileSystem.getLocal(conf);
FileSystem jobFS = FileSystem.get(conf);
for (Path p : getLocalPaths()) {
Path stagedPath = uploadFileIfNecessary(localFS, p, jobFS);
// Calling this method decompresses the archive and makes Hadoop
// handle its classfiles individually. This leads to crippling
// overhead times (10+ seconds) even with the LocalJobRunner
// courtesy of o.a.h.yarn.util.FSDownload.changePermissions
// copying and chmodding each classfile copy file individually.
//job.addArchiveToClassPath(p);
// Just add the compressed archive instead:
job.addFileToClassPath(stagedPath);
}
// We don't really need to set a mapred job jar here,
// but doing so suppresses a warning
String mj = getMapredJar();
if (null != mj)
job.setJar(mj);
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "MergeMultipleFiles"; // 作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
job.setJar("export\\MergeMultipleFiles.jar"); // 指定本地jar包
job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
job.setMapOutputValueClass(BytesWritable.class); // 设置Mapper输出Value类型
job.setMapperClass(MergeMapper.class);
// 输入数据格式
job.setInputFormatClass(MyInputFormat.class);
// 以文件格式输出,使用序列化文件输出类
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 设置作业输出路径
String inputDir = "/workspace/mergeFiles/data";
String outputDir = "/workspace/mergeFiles/output"; // 输出目录
Path outPath = new Path(hdfs + outputDir);
Path inputPath = new Path(hdfs+inputDir);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 13);
//2.设置MapReduce作业配置信息
String jobName = "FixedLengthInput2"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FixedLengthInput2.class); //指定运行时作业类
job.setJar("export\\FixedLengthInput2.jar"); //指定本地jar包
job.setMapperClass(FixedLengthInput2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(FixedLengthInput2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
job.setInputFormatClass(FixedLengthInputFormat.class); //设置输入格式化类
//3.设置作业输入和输出路径
String dataDir = "/expr/fixedinput/data"; //实验数据目录
String outputDir = "/expr/fixedinput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 设置hdfs配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置作业Job配置信息
String jobName = "FlowPartition";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FlowPartition.class);
job.setJar("export\\FlowPartition.jar");
// Map
job.setMapperClass(FlowPartitionMapper.class);
// Reduce
job.setReducerClass(FlowPartitionReducer.class);
// 输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowWritable.class);
// 设置分区类,及Reducer数目
job.setPartitionerClass(PhoneNumberPartitioner.class);
job.setNumReduceTasks(4);
// 设置job输入出路径
String dataDir = "/workspace/flowStatistics/data";
String outputDir = "/workspace/flowStatistics/output_partitions";
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DatePartition"; //定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DatePartition.class); //指定运行时作业类
job.setJar("export\\DatePartition.jar"); //指定本地jar包
// Map
job.setMapperClass(DatePartitionMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
// Reduce
job.setReducerClass(DatePartitionReducer.class); //指定Reducer类
// 全局
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// Partition
job.setPartitionerClass(YearPartitioner.class); //自定义分区方法
job.setNumReduceTasks(10); //设置reduce任务的数量,该值传递给Partitioner.getPartition()方法的numPartitions参数
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/data"; //实验数据目录
String outputDir = "/expr/datecount/output_partition"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 设置hdfs配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://"+namenode_ip+":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置作业Job配置信息
String jobName = "FlowStatistics";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FlowStatistics.class);
job.setJar("export\\FlowStatistics.jar");
// Map
job.setMapperClass(FlowMapper.class);// 第一种
// job.setMapperClass(FlowWritableMapper.class);
// 这里因为同Reducer输出类型一致,可不写
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(FlowWritable.class);
// Reduce
job.setReducerClass(FlowReducer.class);// 第一种
// job.setReducerClass(FlowWritableReducer.class);
// 输出k-v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);// 第一种
// job.setOutputValueClass(FlowWritable.class);
// 设置job输入出路径
String dataDir = "/workspace/flowStatistics/data";
String outputDir = "/workspace/flowStatistics/output";
Path inPath = new Path(hdfs+dataDir);
Path outPath = new Path(hdfs+outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DateGroup2"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateGroup2.class); //指定运行时作业类
job.setJar("export\\DateGroup2.jar"); //指定本地jar包
job.setMapperClass(DateGroup2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(DateGroup2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(Text.class); //设置Reduce输出Value类型
job.setGroupingComparatorClass(MyGroup.class); //设置自定义分组类
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/data"; //实验数据目录
String outputDir = "/expr/datecount/output_group2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "DateSortAsc"; // 定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateSortAsc.class); // 指定作业类
job.setJar("export\\DateSortAsc.jar"); // 指定本地jar包
job.setMapperClass(SortMapper.class); // 指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); // 设置Mapper输出Key类型
job.setMapOutputValueClass(Text.class); // 设置Mapper输出Value类型
job.setReducerClass(SortReducer.class); // 指定Reducer类
job.setOutputKeyClass(Text.class); // 设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); // 设置Reduce输出Value类型
// 3.设置作业输入和输出路径
String dataDir = "/workspace/dateSort/data"; // 实验数据目录
String outputDir = "/workspace/dateSort/output"; // 实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":"); //设置输入文件kv分隔符
//2.设置MapReduce作业配置信息
String jobName = "KeyValueInput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(KeyValueInput.class); //指定运行时作业类
job.setJar("export\\KeyValueInput.jar"); //指定本地jar包
job.setMapperClass(KeyValueInputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(KeyValueInputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
job.setInputFormatClass(KeyValueTextInputFormat.class); //设置输入格式化类
//3.设置作业输入和输出路径
String dataDir = "/expr/kvinput/data"; //实验数据目录
String outputDir = "/expr/kvinput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String hdfs = "hdfs://192.168.17.10:9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置作业配置信息
String jobName = "TempSort";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(TempSort.class);
job.setJar("export\\TempSort.jar");
// Map
job.setMapperClass(TempSortMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Reduce
job.setReducerClass(TempSortReducer.class);
// 全局
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Sort
// job.setSortComparatorClass(MySort.class);
// Partition
job.setPartitionerClass(YearPartitioner.class);
job.setNumReduceTasks(3);
//3.设置作业输入和输出路径
String dataDir = "/expr/test/data"; //实验数据目录
String outputDir = "/expr/test/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
Job job1 = Job.getInstance(conf,"Decompose");
job1.setJarByClass(JobControlRun.class);
job1.setJar("export\\mutualFriend.jar");
job1.setMapperClass(DecomposeFriendsMapper.class);
job1.setReducerClass(DecomposeFriendsReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
Path input = new Path(hdfs+"/workspace/mutualFriends/data");
Path output1 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
FileInputFormat.addInputPath(job1, input);
FileOutputFormat.setOutputPath(job1, output1);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output1)) {
fs.delete(output1, true);
System.out.println("我被删了");// 打印可见只被删了一次,有点怪
}
// ControlledJob作业控制容器
ControlledJob ctrJob1=new ControlledJob(conf);
ctrJob1.setJob(job1);// job1加入控制容器
Job job2 = Job.getInstance(conf, "Merge");
job2.setJarByClass(JobControlRun.class);
job2.setJar("export\\mutualFriend.jar");
job2.setMapperClass(MergeFriendsMapper.class);
job2.setReducerClass(MergeFriendsReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path input2 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
Path output2 = new Path(hdfs+"/workspace/mutualFriends/output_Meg");
FileInputFormat.addInputPath(job2, input2);
FileOutputFormat.setOutputPath(job2, output2);
if (fs.exists(output2)) {
fs.delete(output2, true);
}
ControlledJob ctrJob2 = new ControlledJob(conf);
ctrJob2.setJob(job2);// job2加入作业控制容器
// 添加作业依赖,表明job2依赖job1执行
ctrJob2.addDependingJob(ctrJob1);
// 定义作业主控制容器,监控、调度job1,job2
JobControl jobControl=new JobControl("JobControl");
jobControl.addJob(ctrJob1);
jobControl.addJob(ctrJob2);
// 启动作业线程
Thread T=new Thread(jobControl);
T.start();
while(true){
if(jobControl.allFinished()){// 等待作业全部结束
System.out.println(jobControl.getSuccessfulJobList());// 打印成功job信息
jobControl.stop();
break;
}
}
/**
* 打印控制信息如下
* [job name: Decompose
job id: JobControl0
job state: SUCCESS
job mapred id: job_local445604445_0001
job message: just initialized
job has no depending job:
, job name: Merge
job id: JobControl1
job state: SUCCESS
job mapred id: job_local1897659504_0002
job message: just initialized
job has 1 dependeng jobs:
depending job 0: Decompose
]
*/
}
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
if (useRemoteJar) {
final Path localJar = new Path(
ClassUtil.findContainingJar(SleepJob.class));
ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
localFs.makeQualified(localJar.getParent()).toUri());
job.setJar("viewfs:///jobjars/" + localJar.getName());
} else {
job.setJarByClass(SleepJob.class);
}
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
verifySleepJobCounters(job);
verifyTaskProgress(job);
// TODO later: add explicit "isUber()" checks of some sort (extend
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "FlowCount"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FlowCount.class); //指定运行时作业类
job.setJar("export\\FlowCount.jar"); //指定本地jar包
job.setMapperClass(FlowCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(FlowCountReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output1"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DateGroup"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateGroup.class); //指定运行时作业类
job.setJar("export\\DateGroup.jar"); //指定本地jar包
job.setMapperClass(DateGroupMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(DateGroupReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(Text.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/data"; //实验数据目录
String outputDir = "/expr/datecount/output_group"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "Missed"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(Missed.class); //指定运行时作业类
job.setJar("export\\Missed.jar"); //指定本地jar包
job.setMapperClass(MissedMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(NullWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MissedReducer.class); //指定Reducer类
//定义多文件输出的文件名、输出格式、键类型、值类型
MultipleOutputs.addNamedOutput(job, "missed", TextOutputFormat.class, Text.class, NullWritable.class);
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MultInput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultInput.class); //指定运行时作业类
job.setJar("export\\MultInput.jar"); //指定本地jar包
job.setMapperClass(MultInputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MultInputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
//方法一:FileInputFormat.addInputPath()
FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"));//输入目录1
FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt2"));//输入目录2
//方法二:FileInputFormat.addInputPaths()
//FileInputFormat.addInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2"));
//方法三:FileInputFormat.setInputPaths()
//FileInputFormat.setInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2") );
Path outPath = new Path(hdfs + "/expr/multinput/output"); //输出目录
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MedianStdDevJob"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MedianStdDevJob.class); //指定运行时作业类
job.setJar("export\\MedianStdDevJob.jar"); //指定本地jar包
job.setMapperClass(MedianStdDevMapper.class); //指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MedianStdDevReducer.class); //指定Reducer类
job.setOutputKeyClass(IntWritable.class); //设置Reduce输出Key类型
job.setOutputValueClass(MedianStdDevTuple.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/medianstddev/data"; //实验数据目录
String outputDir = "/expr/medianstddev/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
GenericOptionsParser parser = new GenericOptionsParser(conf, args); //工具类帮我们把-D 等等的属性直接set到conf,会留下commandOptions
String[] othargs = parser.getRemainingArgs();
//让框架知道是windows异构平台运行
conf.set("mapreduce.app-submission.cross-platform","true");
// conf.set("mapreduce.framework.name","local");
// System.out.println(conf.get("mapreduce.framework.name"));
Job job = Job.getInstance(conf);
// FileInputFormat.setMinInputSplitSize(job,2222);
// job.setInputFormatClass(ooxx.class);
job.setJar("C:\\Users\\admin\\IdeaProjects\\msbhadoop\\target\\hadoop-hdfs-1.0-0.1.jar");
//必须必须写的
job.setJarByClass(MyWordCount.class);
job.setJobName("mashibing");
Path infile = new Path(othargs[0]);
TextInputFormat.addInputPath(job, infile);
Path outfile = new Path(othargs[1]);
if (outfile.getFileSystem(conf).exists(outfile)) outfile.getFileSystem(conf).delete(outfile, true);
TextOutputFormat.setOutputPath(job, outfile);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
// job.setNumReduceTasks(2);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
}