下面列出了org.apache.hadoop.mapreduce.Job#setJarByClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 2) {
System.err.println("Usage: ContentLoader configFile inputDir");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "content loader");
job.setJarByClass(ContentLoader.class);
job.setInputFormatClass(ContentInputFormat.class);
job.setMapperClass(ContentMapper.class);
job.setMapOutputKeyClass(DocumentURI.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(ContentOutputFormat.class);
ContentInputFormat.setInputPaths(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
String reportSeparatorString = (args.length > 2) ? args[2]: ":";
conf.set("ReportSeparator", reportSeparatorString);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CellCounter.class);
Scan scan = getConfiguredScanForJob(conf, args);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setReducerClass(IntSumReducer.class);
return job;
}
public void run() throws Exception{
long startTime = System.currentTimeMillis();
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, Constants.hbase_user_item_pref_table);
Job job = Job.getInstance(conf, "hbasewriter"+System.currentTimeMillis());
job.setJarByClass(UpdateCFJob.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(HBaseWriteReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
long endTime = System.currentTimeMillis();
boolean isFinish = job.waitForCompletion(true);
if(isFinish){
logger.info("UpdateCFJob job ["+job.getJobName()+"] run finish.it costs"+ (endTime - startTime) / 1000 +"s.");
} else {
logger.error("UpdateCFJob job ["+job.getJobName()+"] run failed.");
}
}
public static void printFinalRanks (Configuration conf, FileSystem fs, String inputPath, String outputPath) throws Exception {
Path outFile = new Path (outputPath);
if (fs.exists(outFile)) {
fs.delete(outFile, true);
}
Job job = Job.getInstance(conf);
job.setMapperClass(RankPrinter.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(RankPrinter.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, outFile);
job.waitForCompletion(true);
}
private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor,
List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]");
job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
// Needed for the bloom filter path information.
BlurOutputFormat.setTableDescriptor(job, descriptor);
BlurInputFormat.setLocalCachePath(job, fileCachePath);
ExistingDataIndexLookupMapper.setSnapshot(job, snapshot);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
}
Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
job.setJarByClass(getClass());
job.setMapperClass(LookupBuilderMapper.class);
job.setReducerClass(LookupBuilderReducer.class);
int shardCount = descriptor.getShardCount();
job.setNumReduceTasks(shardCount);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BooleanWritable.class);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE));
} else {
throw new IOException("Partitioning failed!");
}
}
@Test
public void test() throws Exception {
Connector connector = accumuloMiniClusterDriver.getConnector();
AccumuloEventStore store = new AccumuloEventStore(connector);
event = EventBuilder.create("", UUID.randomUUID().toString(), System.currentTimeMillis())
.attr(new Attribute("key1", "val1"))
.attr(new Attribute("key2", false)).build();
store.save(singleton(event));
store.flush();
Job job = new Job(new Configuration());
job.setJarByClass(getClass());
job.setMapperClass(TestMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(EventInputFormat.class);
EventInputFormat.setZooKeeperInstance(job, accumuloMiniClusterDriver.getClientConfiguration());
EventInputFormat.setInputInfo(job, "root", accumuloMiniClusterDriver.getRootPassword().getBytes(), new Authorizations());
EventInputFormat.setQueryInfo(job, new Date(System.currentTimeMillis() - 50000), new Date(), Collections.singleton(""),
QueryBuilder.create().eq("key1", "val1").build());
job.setOutputFormatClass(NullOutputFormat.class);
job.submit();
job.waitForCompletion(true);
assertNotNull(TestMapper.entry);
assertEquals(TestMapper.entry.getId(), event.getId());
assertTrue(TestMapper.entry.getTimestamp() - event.getTimestamp() < 50);
assertEquals(new HashSet<Attribute>(TestMapper.entry.getAttributes()), new HashSet<Attribute>(event.getAttributes()));
}
/**
* Run an avro hadoop job with job conf
* @param conf
* @throws Exception
*/
public static void runAvroJob(JobConf conf) throws Exception
{
Path[] inputPaths = AvroInputFormat.getInputPaths(conf);
_log.info("Running hadoop job with input paths:");
for (Path inputPath : inputPaths)
{
_log.info(inputPath);
}
_log.info("Output path="+AvroOutputFormat.getOutputPath(conf));
Job job = new Job(conf);
job.setJarByClass(AvroUtils.class);
job.waitForCompletion(true);
}
/**
* Create Job object for submitting it, with all the configuration
*
* @return Reference to job object.
* @throws IOException - Exception if any
*/
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 1) {
System.err.println(
"Usage: ElemAttrValueCooccurrencesTest configFile outputDir");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ElemAttrValueCooccurrencesTest.class);
job.setInputFormatClass(ValueInputFormat.class);
job.setMapperClass(ElemAttrCooccurrencesMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_KEY_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
ElemAttrValueCooccurrencesFunction.class, ElemAttrValueCooccurrences.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
//set from the command line
job.setJarByClass(Phase2ExactMatchDeDuplication.class);
job.setJobName(Phase2ExactMatchDeDuplication.class.getName());
// mapper
job.setMapperClass(ExactMatchDetectionMapper.class);
// we will compress the mapper's output (use fast Snappy compressor)
job.getConfiguration().setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration()
.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
// reducer
job.setReducerClass(UniqueWarcWriterReducer.class);
// no combiner, as the output classes in mapper and reducer are different!
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
job.setOutputFormatClass(WARCOutputFormat.class);
// mapper output data
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WARCWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf=new Configuration();
conf.set("N", args[0]);
Job job = new Job(conf);
job.setJarByClass(Top_N_Driver.class);
job.setJobName("Top_N_Driver");
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(Top_N_Mapper.class);
job.setReducerClass(Top_N_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
for (Map.Entry<String, String> next : job.getConfiguration()) {
System.out.println(next.getKey() + ": " + next.getValue());
}
job.setJarByClass(PagesByURLExtractor.class);
job.setJobName(PagesByURLExtractor.class.getName());
// mapper
job.setMapperClass(MapperClass.class);
// input
job.setInputFormatClass(WARCInputFormat.class);
// output
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setCompressOutput(job, true);
// paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
// load IDs to be searched for
job.getConfiguration().set(MAPREDUCE_MAPPER_URLS, loadURLs(args[2]));
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
// job.setNumReduceTasks(0);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogBeanWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "DateSortDesc"; // 定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateSortAsc.class); // 指定作业类
job.setJar("export\\DateSortDesc.jar"); // 指定本地jar包
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定排序所使用的比较器
job.setSortComparatorClass(MyComparator.class);
// 3.设置作业输入和输出路径
String dataDir = "/workspace/dateSort/data"; // 实验数据目录
String outputDir = "/workspace/dateSort/output"; // 实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
/**
* @param args
*/
public static void main(String[] args) throws IOException, InterruptedException , ClassNotFoundException{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
LOGGER.debug("Data Profiling job values respectively ["+otherArgs[0]+"], "+
otherArgs[1]);
StringBuilder sb = new StringBuilder();
int dynamicArgs = 0;
dynamicArgs = ((otherArgs.length)-1);
for (int i = dynamicArgs; i < otherArgs.length; i++) {
LOGGER.debug("other arguments" + otherArgs[i]);
sb.append(otherArgs[i]);
}
String outputPath = DataProfilingConstants.OUTPUT_DIR_PATH + new Date().getTime();
String inputPath = otherArgs[0];
String dpBeanString = sb.toString();
LOGGER.debug("Received dpBean value [" + dpBeanString+"]");
Gson gson = new Gson();
Type type = new TypeToken<DataProfilingBean>() {
}.getType();
DataProfilingBean dataProfilingBean = gson.fromJson(dpBeanString, type);
String recordSeparator = dataProfilingBean.getRecordSeparator();
conf.set(DataProfilingConstants.DATA_PROFILING_BEAN, dpBeanString);
conf.set(DataProfilingConstants.RECORD_SEPARATOR, recordSeparator);
conf.set(DataProfilingConstants.TEXTINPUTFORMAT_RECORD_DELIMITER, recordSeparator);
Job job = new Job(conf,DataProfilingConstants.JOB_NAME);
job.setJarByClass(DataProfilingJobExecutor.class);
job.setMapperClass(DataProfilingMapper.class);
job.setCombinerClass(DataProfilingReducer.class);
job.setReducerClass(DataProfilingReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path[] inputPaths = FileUtil.getAllNestedFilePath(job, inputPath);
TextInputFormat.setInputPaths(job, inputPaths);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
LOGGER.debug("Job completed , now going to read the result from hdfs");
Set<CriteriaBasedDataProfiling> criteriaBasedDataProfilings = readJobOutputFromHdfs(conf,outputPath,dataProfilingBean);
final Gson dpReportGson = new GsonBuilder().disableHtmlEscaping().create();
final String jsonString = dpReportGson.toJson(criteriaBasedDataProfilings);
LOGGER.info(DataProfilingConstants.DATA_PROFILING_REPORT + jsonString);
}
public int run(String[] args) throws Exception {
/*Configuration conf = getConf();
JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String join_reduces = conf.get(REDUCES_PER_HOST);
if (join_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(join_reduces);
}
// Set user-supplied (possibly default) job configs
job.setNumReduceTasks(num_reduces);*/
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://node-01:9000");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
String commaSeparatedPaths = null;
String outputDir = null;
if (otherArgs.length == 2) {
commaSeparatedPaths = otherArgs[0];
outputDir = otherArgs[1];
} else {
System.err.println("Usage: <in>[,<in>...] <out>");
//System.exit(-1);
return -1;
}
Job job = Job.getInstance(conf);
job.setJobName("FlowSortJob");
job.setJarByClass(FlowSortJob.class);
job.setMapperClass(FlowSortMapper.class);
//job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(FlowSortReducer.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, commaSeparatedPaths);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Invalid no. of arguments provided");
System.err.println("Usage: terasort <input-dir> <output-dir>");
return -1;
}
LOG.info("starting");
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
// config.set("ip.file.path", args[2]);
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
/**
* @throws Exception If failed.
*/
@Test
public void testMapCombineRun() throws Exception {
int lineCnt = 10001;
String fileName = "/testFile";
prepareFile(fileName, lineCnt);
totalLineCnt.set(0);
taskWorkDirs.clear();
Configuration cfg = new Configuration();
cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
cfg.setBoolean(MAP_WRITE, true);
Job job = Job.getInstance(cfg);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TestMapper.class);
job.setCombinerClass(TestCombiner.class);
job.setReducerClass(TestReducer.class);
job.setNumReduceTasks(2);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("igfs://" + igfsName + "@/"));
FileOutputFormat.setOutputPath(job, new Path("igfs://" + igfsName + "@/output/"));
job.setJarByClass(getClass());
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
assertEquals(lineCnt, totalLineCnt.get());
assertEquals(34, taskWorkDirs.size());
for (int g = 0; g < gridCount(); g++)
grid(g).hadoop().finishFuture(jobId).get();
}