org.apache.hadoop.mapreduce.Job#setJar ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setJar ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: MapReduce-Demo   文件: Step2.java
public static boolean run(Configuration config, Map<String, String> paths) 
		throws IOException, ClassNotFoundException, InterruptedException {
	String jobName = "step2";
	Job job = Job.getInstance(config, jobName);
	job.setJarByClass(Step2.class);
	job.setJar("export\\ItemCF.jar");
	job.setMapperClass(Step2_Mapper.class);
	job.setReducerClass(Step2_Reducer.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);

	Path inPath = new Path(paths.get("Step2Input"));
	Path outpath = new Path(paths.get("Step2Output"));
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outpath);		
	FileSystem fs = FileSystem.get(config);
	if (fs.exists(outpath)) {
		fs.delete(outpath, true);
	}
	
	return job.waitForCompletion(true);	
}
 
源代码2 项目: titan1withtp3.1   文件: DistCacheConfigurer.java
@Override
public void configure(Job job) throws IOException {

    Configuration conf = job.getConfiguration();
    FileSystem localFS = FileSystem.getLocal(conf);
    FileSystem jobFS = FileSystem.get(conf);

    for (Path p : getLocalPaths()) {
        Path stagedPath = uploadFileIfNecessary(localFS, p, jobFS);
        // Calling this method decompresses the archive and makes Hadoop
        // handle its classfiles individually.  This leads to crippling
        // overhead times (10+ seconds) even with the LocalJobRunner
        // courtesy of o.a.h.yarn.util.FSDownload.changePermissions
        // copying and chmodding each classfile copy file individually.
        //job.addArchiveToClassPath(p);
        // Just add the compressed archive instead:
        job.addFileToClassPath(stagedPath);
    }

    // We don't really need to set a mapred job jar here,
    // but doing so suppresses a warning
    String mj = getMapredJar();
    if (null != mj)
        job.setJar(mj);
}
 
源代码3 项目: MapReduce-Demo   文件: MergeJob.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 2.设置MapReduce作业配置信息
	String jobName = "MergeMultipleFiles"; // 作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
	job.setJar("export\\MergeMultipleFiles.jar"); // 指定本地jar包
	job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
	job.setMapOutputValueClass(BytesWritable.class); // 设置Mapper输出Value类型
	job.setMapperClass(MergeMapper.class);
	// 输入数据格式
	job.setInputFormatClass(MyInputFormat.class);
	// 以文件格式输出,使用序列化文件输出类
	job.setOutputFormatClass(SequenceFileOutputFormat.class);

	// 设置作业输出路径
	String inputDir = "/workspace/mergeFiles/data";
	String outputDir = "/workspace/mergeFiles/output"; // 输出目录
	Path outPath = new Path(hdfs + outputDir);
	Path inputPath = new Path(hdfs+inputDir);
	FileInputFormat.setInputPaths(job, inputPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	// 运行作业
	System.out.println("Job: " + jobName + " is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码4 项目: MapReduce-Demo   文件: FixedLengthInput2.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 13);
	
	//2.设置MapReduce作业配置信息
	String jobName = "FixedLengthInput2";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FixedLengthInput2.class);				//指定运行时作业类
	job.setJar("export\\FixedLengthInput2.jar");				//指定本地jar包
	job.setMapperClass(FixedLengthInput2Mapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(FixedLengthInput2Reducer.class);		//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 			//设置Reduce输出Value类型
	
	job.setInputFormatClass(FixedLengthInputFormat.class);	//设置输入格式化类
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/fixedinput/data";			//实验数据目录	
	String outputDir = "/expr/fixedinput/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码5 项目: MapReduce-Demo   文件: FlowPartition.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 设置hdfs配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 设置作业Job配置信息
	String jobName = "FlowPartition";
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FlowPartition.class);
	job.setJar("export\\FlowPartition.jar");
	// Map
	 job.setMapperClass(FlowPartitionMapper.class);
	// Reduce
	job.setReducerClass(FlowPartitionReducer.class);
	// 输出k-v类型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(FlowWritable.class);
	// 设置分区类,及Reducer数目
	job.setPartitionerClass(PhoneNumberPartitioner.class);
	job.setNumReduceTasks(4);

	// 设置job输入出路径
	String dataDir = "/workspace/flowStatistics/data";
	String outputDir = "/workspace/flowStatistics/output_partitions";
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	// 运行作业
	System.out.println("Job: " + jobName + " is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码6 项目: MapReduce-Demo   文件: DatePartition.java
public static void main(String[] args) throws Exception {		
		//1.设置HDFS配置信息
		String namenode_ip = "192.168.17.10";
		String hdfs = "hdfs://" + namenode_ip + ":9000";			
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", hdfs);
		conf.set("mapreduce.app-submission.cross-platform", "true");

		//2.设置MapReduce作业配置信息
		String jobName = "DatePartition";					//定义作业名称
		Job job = Job.getInstance(conf, jobName);
		job.setJarByClass(DatePartition.class);				//指定运行时作业类
		job.setJar("export\\DatePartition.jar");			//指定本地jar包
//		Map
		job.setMapperClass(DatePartitionMapper.class);		//指定Mapper类
		job.setMapOutputKeyClass(Text.class);				//设置Mapper输出Key类型
		job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
//		Reduce
		job.setReducerClass(DatePartitionReducer.class);	//指定Reducer类
//		全局
		job.setOutputKeyClass(Text.class);					//设置Reduce输出Key类型
		job.setOutputValueClass(IntWritable.class);			//设置Reduce输出Value类型
//		Partition
		job.setPartitionerClass(YearPartitioner.class);		//自定义分区方法
		job.setNumReduceTasks(10); 	//设置reduce任务的数量,该值传递给Partitioner.getPartition()方法的numPartitions参数
		
		//3.设置作业输入和输出路径
		String dataDir = "/expr/datecount/data";				//实验数据目录	
		String outputDir = "/expr/datecount/output_partition";	//实验输出目录
		Path inPath = new Path(hdfs + dataDir);
		Path outPath = new Path(hdfs + outputDir);
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		
		//4.运行作业
		System.out.println("Job: " + jobName + " is running...");
		if(job.waitForCompletion(true)) {
			System.out.println("success!");
			System.exit(0);
		} else {
			System.out.println("failed!");
			System.exit(1);
		}
	}
 
源代码7 项目: MapReduce-Demo   文件: FlowStatistics.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 设置hdfs配置信息
		String namenode_ip = "192.168.17.10";
		String hdfs = "hdfs://"+namenode_ip+":9000";
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", hdfs);
		conf.set("mapreduce.app-submission.cross-platform", "true");
		
		// 设置作业Job配置信息
		String jobName = "FlowStatistics";
		Job job = Job.getInstance(conf, jobName);
		job.setJarByClass(FlowStatistics.class);
		job.setJar("export\\FlowStatistics.jar");
		// Map
		job.setMapperClass(FlowMapper.class);// 第一种
//		job.setMapperClass(FlowWritableMapper.class);
		// 这里因为同Reducer输出类型一致,可不写
//		job.setMapOutputKeyClass(Text.class);
//		job.setMapOutputValueClass(FlowWritable.class);
		// Reduce
		job.setReducerClass(FlowReducer.class);// 第一种
//		job.setReducerClass(FlowWritableReducer.class);
		// 输出k-v类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);// 第一种
//		job.setOutputValueClass(FlowWritable.class);
		
		// 设置job输入出路径
		String dataDir = "/workspace/flowStatistics/data";
		String outputDir = "/workspace/flowStatistics/output";
		Path inPath = new Path(hdfs+dataDir);
		Path outPath = new Path(hdfs+outputDir);
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		
		// 运行作业
		System.out.println("Job: " + jobName + " is running...");
		if(job.waitForCompletion(true)) {
			System.out.println("success!");
			System.exit(0);
		} else {
			System.out.println("failed!");
			System.exit(1);
		}
	}
 
源代码8 项目: MapReduce-Demo   文件: DateGroup2.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "DateGroup2";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(DateGroup2.class);			//指定运行时作业类
	job.setJar("export\\DateGroup2.jar");			//指定本地jar包
	job.setMapperClass(DateGroup2Mapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(DateGroup2Reducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(Text.class); 			//设置Reduce输出Value类型
	job.setGroupingComparatorClass(MyGroup.class);	//设置自定义分组类
	//3.设置作业输入和输出路径
	String dataDir = "/expr/datecount/data";			//实验数据目录	
	String outputDir = "/expr/datecount/output_group2";	//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码9 项目: MapReduce-Demo   文件: DateSortAsc.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 2.设置MapReduce作业配置信息
	String jobName = "DateSortAsc"; // 定义作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(DateSortAsc.class); // 指定作业类
	job.setJar("export\\DateSortAsc.jar"); // 指定本地jar包
	
	job.setMapperClass(SortMapper.class); // 指定Mapper类
	job.setMapOutputKeyClass(IntWritable.class); // 设置Mapper输出Key类型
	job.setMapOutputValueClass(Text.class); // 设置Mapper输出Value类型
	
	job.setReducerClass(SortReducer.class); // 指定Reducer类
	job.setOutputKeyClass(Text.class); // 设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); // 设置Reduce输出Value类型

	// 3.设置作业输入和输出路径
	String dataDir = "/workspace/dateSort/data"; // 实验数据目录
	String outputDir = "/workspace/dateSort/output"; // 实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	// 4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码10 项目: MapReduce-Demo   文件: KeyValueInput.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":");	//设置输入文件kv分隔符
	
	//2.设置MapReduce作业配置信息
	String jobName = "KeyValueInput";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(KeyValueInput.class);				//指定运行时作业类
	job.setJar("export\\KeyValueInput.jar");			//指定本地jar包
	job.setMapperClass(KeyValueInputMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);				//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(KeyValueInputReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);					//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 		//设置Reduce输出Value类型
	
	job.setInputFormatClass(KeyValueTextInputFormat.class);	//设置输入格式化类
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/kvinput/data";			//实验数据目录	
	String outputDir = "/expr/kvinput/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码11 项目: MapReduce-Demo   文件: TempSort.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		String hdfs = "hdfs://192.168.17.10:9000";
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", hdfs);
		conf.set("mapreduce.app-submission.cross-platform", "true");
		// 设置作业配置信息
		String jobName = "TempSort";
		Job job = Job.getInstance(conf, jobName);
		job.setJarByClass(TempSort.class);
		job.setJar("export\\TempSort.jar");
		// Map
		job.setMapperClass(TempSortMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// Reduce
		job.setReducerClass(TempSortReducer.class);
		// 全局
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// Sort
//		job.setSortComparatorClass(MySort.class);
		// Partition
		job.setPartitionerClass(YearPartitioner.class);
		job.setNumReduceTasks(3);
		//3.设置作业输入和输出路径
		String dataDir = "/expr/test/data";	//实验数据目录	
		String outputDir = "/expr/test/output";				//实验输出目录
		Path inPath = new Path(hdfs + dataDir);
		Path outPath = new Path(hdfs + outputDir);
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		
		//4.运行作业
		System.out.println("Job: " + jobName + " is running...");
		if(job.waitForCompletion(true)) {
			System.out.println("success!");
			System.exit(0);
		} else {
			System.out.println("failed!");
			System.exit(1);
		}
	}
 
源代码12 项目: MapReduce-Demo   文件: JobControlRun.java
public static void main(String[] args) throws IOException {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	Job job1 = Job.getInstance(conf,"Decompose");
	job1.setJarByClass(JobControlRun.class);
	job1.setJar("export\\mutualFriend.jar");
	job1.setMapperClass(DecomposeFriendsMapper.class);
	job1.setReducerClass(DecomposeFriendsReducer.class);
	job1.setOutputKeyClass(Text.class);
	job1.setOutputValueClass(Text.class);
	
	Path input = new Path(hdfs+"/workspace/mutualFriends/data");
	Path output1 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
	FileInputFormat.addInputPath(job1, input);
	FileOutputFormat.setOutputPath(job1, output1);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(output1)) {
		fs.delete(output1, true);
		System.out.println("我被删了");// 打印可见只被删了一次,有点怪
	}
	// ControlledJob作业控制容器
	ControlledJob ctrJob1=new ControlledJob(conf);
	ctrJob1.setJob(job1);// job1加入控制容器
	
	Job job2 = Job.getInstance(conf, "Merge");
	job2.setJarByClass(JobControlRun.class);
	job2.setJar("export\\mutualFriend.jar");
	job2.setMapperClass(MergeFriendsMapper.class);
	job2.setReducerClass(MergeFriendsReducer.class);
	job2.setOutputKeyClass(Text.class);
	job2.setOutputValueClass(Text.class);
	
	Path input2 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
	Path output2 = new Path(hdfs+"/workspace/mutualFriends/output_Meg");
	FileInputFormat.addInputPath(job2, input2);
	FileOutputFormat.setOutputPath(job2, output2);
	if (fs.exists(output2)) {
		fs.delete(output2, true);
	}
	ControlledJob ctrJob2 = new ControlledJob(conf);
	ctrJob2.setJob(job2);// job2加入作业控制容器
	
	// 添加作业依赖,表明job2依赖job1执行
	ctrJob2.addDependingJob(ctrJob1);
	
	// 定义作业主控制容器,监控、调度job1,job2
	JobControl jobControl=new JobControl("JobControl");
	jobControl.addJob(ctrJob1);
	jobControl.addJob(ctrJob2);
	// 启动作业线程
	Thread T=new Thread(jobControl);
	T.start();
	while(true){
		if(jobControl.allFinished()){// 等待作业全部结束
			System.out.println(jobControl.getSuccessfulJobList());// 打印成功job信息
			jobControl.stop();
			break;
		}
	}
	/**
	 * 打印控制信息如下
	 * [job name:	Decompose
		job id:	JobControl0
		job state:	SUCCESS
		job mapred id:	job_local445604445_0001
		job message:	just initialized
		job has no depending job:	
		, job name:	Merge
		job id:	JobControl1
		job state:	SUCCESS
		job mapred id:	job_local1897659504_0002
		job message:	just initialized
		job has 1 dependeng jobs:
			 depending job 0:	Decompose
		]
	 */
}
 
源代码13 项目: hadoop   文件: TestMRJobs.java
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
  LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);

  if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
             + " not found. Not running test.");
    return;
  }

  Configuration sleepConf = new Configuration(mrCluster.getConfig());
  // set master address to local to test that local mode applied iff framework == local
  sleepConf.set(MRConfig.MASTER_ADDRESS, "local");	
  
  SleepJob sleepJob = new SleepJob();
  sleepJob.setConf(sleepConf);
 
  // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
  Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);

  job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
  if (useRemoteJar) {
    final Path localJar = new Path(
        ClassUtil.findContainingJar(SleepJob.class));
    ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
        localFs.makeQualified(localJar.getParent()).toUri());
    job.setJar("viewfs:///jobjars/" + localJar.getName());
  } else {
    job.setJarByClass(SleepJob.class);
  }
  job.setMaxMapAttempts(1); // speed up failures
  job.submit();
  String trackingUrl = job.getTrackingURL();
  String jobId = job.getJobID().toString();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue(succeeded);
  Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
  Assert.assertTrue("Tracking URL was " + trackingUrl +
                    " but didn't Match Job ID " + jobId ,
        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
  verifySleepJobCounters(job);
  verifyTaskProgress(job);
  
  // TODO later:  add explicit "isUber()" checks of some sort (extend
  // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
 
源代码14 项目: MapReduce-Demo   文件: FlowCount.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "FlowCount";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FlowCount.class);				//指定运行时作业类
	job.setJar("export\\FlowCount.jar");			//指定本地jar包
	job.setMapperClass(FlowCountMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(FlowCountReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output1";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码15 项目: MapReduce-Demo   文件: DateGroup.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "DateGroup";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(DateGroup.class);				//指定运行时作业类
	job.setJar("export\\DateGroup.jar");			//指定本地jar包
	job.setMapperClass(DateGroupMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(DateGroupReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(Text.class); 			//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/datecount/data";			//实验数据目录	
	String outputDir = "/expr/datecount/output_group";	//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码16 项目: MapReduce-Demo   文件: Missed.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "Missed";						//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(Missed.class);				//指定运行时作业类
	job.setJar("export\\Missed.jar");				//指定本地jar包
	job.setMapperClass(MissedMapper.class);			//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(NullWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(MissedReducer.class);		//指定Reducer类		
	//定义多文件输出的文件名、输出格式、键类型、值类型
	MultipleOutputs.addNamedOutput(job, "missed", TextOutputFormat.class, Text.class, NullWritable.class);
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output2";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码17 项目: MapReduce-Demo   文件: MultInput.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MultInput";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultInput.class);				//指定运行时作业类
	job.setJar("export\\MultInput.jar");			//指定本地jar包
	job.setMapperClass(MultInputMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(MultInputReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径	
	//方法一:FileInputFormat.addInputPath()
	FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"));//输入目录1
	FileInputFormat.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt2"));//输入目录2
	
	//方法二:FileInputFormat.addInputPaths()
	//FileInputFormat.addInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2"));
	
	//方法三:FileInputFormat.setInputPaths()
	//FileInputFormat.setInputPaths(job, String.join(",", hdfs+"/expr/multinput/data/txt1", hdfs+"/expr/multinput/data/txt2") );
	
	Path outPath = new Path(hdfs + "/expr/multinput/output");		//输出目录
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码18 项目: hadoop   文件: TestMRJobs.java
public void _testDistributedCache(String jobJarPath) throws Exception {
  if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
         + " not found. Not running test.");
    return;
  }

  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);

  Job job = Job.getInstance(mrCluster.getConfig());
  
  // Set the job jar to a new "dummy" jar so we can check that its extracted 
  // properly
  job.setJar(jobJarPath);
  // Because the job jar is a "dummy" jar, we need to include the jar with
  // DistributedCacheChecker or it won't be able to find it
  Path distributedCacheCheckerJar = new Path(
          JarFinder.getJar(DistributedCacheChecker.class));
  job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
          localFs.getUri(), distributedCacheCheckerJar.getParent()));
  
  job.setMapperClass(DistributedCacheChecker.class);
  job.setOutputFormatClass(NullOutputFormat.class);

  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
      new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  // The AppMaster jar itself
  job.addFileToClassPath(
          APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); 
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  String trackingUrl = job.getTrackingURL();
  String jobId = job.getJobID().toString();
  Assert.assertTrue(job.waitForCompletion(false));
  Assert.assertTrue("Tracking URL was " + trackingUrl +
                    " but didn't Match Job ID " + jobId ,
        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
 
源代码19 项目: MapReduce-Demo   文件: MedianStdDevJob.java
public static void main(String[] args) throws Exception {
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MedianStdDevJob";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MedianStdDevJob.class);			//指定运行时作业类
	job.setJar("export\\MedianStdDevJob.jar");			//指定本地jar包
	job.setMapperClass(MedianStdDevMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(IntWritable.class);		//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(MedianStdDevReducer.class);		//指定Reducer类
	job.setOutputKeyClass(IntWritable.class);			//设置Reduce输出Key类型
	job.setOutputValueClass(MedianStdDevTuple.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/medianstddev/data";			//实验数据目录	
	String outputDir = "/expr/medianstddev/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码20 项目: BigDataArchitect   文件: MyWordCount.java
public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        GenericOptionsParser parser = new GenericOptionsParser(conf, args);  //工具类帮我们把-D 等等的属性直接set到conf,会留下commandOptions
        String[] othargs = parser.getRemainingArgs();

        //让框架知道是windows异构平台运行
        conf.set("mapreduce.app-submission.cross-platform","true");

//        conf.set("mapreduce.framework.name","local");
//        System.out.println(conf.get("mapreduce.framework.name"));

        Job job = Job.getInstance(conf);


//        FileInputFormat.setMinInputSplitSize(job,2222);
//        job.setInputFormatClass(ooxx.class);






        job.setJar("C:\\Users\\admin\\IdeaProjects\\msbhadoop\\target\\hadoop-hdfs-1.0-0.1.jar");
        //必须必须写的
        job.setJarByClass(MyWordCount.class);

        job.setJobName("mashibing");

        Path infile = new Path(othargs[0]);
        TextInputFormat.addInputPath(job, infile);

        Path outfile = new Path(othargs[1]);
        if (outfile.getFileSystem(conf).exists(outfile)) outfile.getFileSystem(conf).delete(outfile, true);
        TextOutputFormat.setOutputPath(job, outfile);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);

//        job.setNumReduceTasks(2);
        // Submit the job, then poll for progress until the job is complete
        job.waitForCompletion(true);

    }