下面列出了org.apache.hadoop.mapreduce.Job#setMapOutputKeyClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: ElementAttributeValuesTest configFile outputDir");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ElementAttributeValuesTest.class);
job.setInputFormatClass(ValueInputFormat.class);
job.setMapperClass(ElementAttrValueMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
ElementAttributeValuesFunction.class, ElementAttributeValues.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
Class<? extends InputFormat<?,?>> inputFormatClass,
Class<? extends Mapper<?,?,?,?>> mapperClass,
Class<? extends OutputFormat<?,?>> outputFormatClass) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setInputFormatClass(inputFormatClass);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
boolean ret = job.waitForCompletion(true);
// Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
// LocalJobRuner
if (isHadoop1()) {
callOutputCommitter(job, outputFormatClass);
}
return ret;
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
String reportSeparatorString = (args.length > 2) ? args[2]: ":";
conf.set("ReportSeparator", reportSeparatorString);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CellCounter.class);
Scan scan = getConfiguredScanForJob(conf, args);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setReducerClass(IntSumReducer.class);
return job;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
String jobName = "PeopleRank";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(PeopleRank.class);
job.setJar("export\\PeopleRank.jar");
job.setMapperClass(PeopleRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(PeopleRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String dataDir = "/expr/peoplerank/data";
String outputDir = "/expr/peoplerank/output/adjacent";
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
System.out.println( "Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
/**
* Create Job object for submitting it, with all the configuration
*
* @return Reference to job object.
* @throws IOException - Exception if any
*/
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
job.setMapperClass(getMapperClass());
// Concurrent writes of the same records would be problematic.
ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
job.setMapOutputKeyClass(SqoopRecord.class);
job.setMapOutputValueClass(NullWritable.class);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String outpath = conf.get(OUTPUTPATH);
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()),
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) ,
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
job.setSortComparatorClass(JoinSelectSortComparator.class);
job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
job.setPartitionerClass(JoinSelectPartitioner.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* Tests an MR Scan initialized from properties set in the Configuration.
*/
protected void testScanFromConfiguration(String start, String stop, String last)
throws IOException, InterruptedException, ClassNotFoundException {
String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
"To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
c.set(KEY_STARTROW, start != null ? start : "");
c.set(KEY_LASTROW, last != null ? last : "");
if (start != null) {
c.set(TableInputFormat.SCAN_ROW_START, start);
}
if (stop != null) {
c.set(TableInputFormat.SCAN_ROW_STOP, stop);
}
Job job = Job.getInstance(c, jobName);
job.setMapperClass(ScanMapper.class);
job.setReducerClass(ScanReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(ImmutableBytesWritable.class);
job.setInputFormatClass(TableInputFormat.class);
job.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
TableMapReduceUtil.addDependencyJars(job);
assertTrue(job.waitForCompletion(true));
}
public static void main(String[] args) throws Exception {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
String jobName = "TopTenJob";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(TopTenJob.class);
job.setJar("export\\TopTen.jar");
job.setMapperClass(TopTenMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TopTenReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1); //计算最终TopN,只能运行一个Reduce任务
String dataDir = "/expr/topten/data";
String outputDir = "/expr/topten/output";
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
System.out.println( "Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void initTableMRJob(Job job, String intable, String outtable, String auths) throws AccumuloSecurityException {
Configuration conf = job.getConfiguration();
String username = conf.get(USERNAME);
String password = conf.get(PASSWORD);
String instance = conf.get(INSTANCE);
String zookeepers = conf.get(ZOOKEEPERS);
System.out.println("Zookeepers are " + auths);
if (zookeepers != null) {
AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
} else {
throw new IllegalArgumentException("Must specify either mock or zookeepers");
}
AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password));
AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths));
AccumuloInputFormat.setInputTableName(job, intable);
job.setInputFormatClass(AccumuloInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// OUTPUT
AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password));
AccumuloOutputFormat.setDefaultTableName(job, outtable);
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysrot <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "PVMinMax2"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(PVMinMax2.class); //指定运行时作业类
job.setJar("export\\PVMinMax2.jar"); //指定本地jar包
job.setMapperClass(PVMinMax2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(Text.class); //设置Mapper输出Value类型
job.setReducerClass(PVMinMax2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(Text.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/output5_1"; //实验数据目录
String outputDir = "/expr/weblog/output5_2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public int run(String[] args) throws Exception
{
String outputReducerType = "filesystem";
if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
{
String[] s = args[0].split("=");
if (s != null && s.length == 2)
outputReducerType = s[1];
}
logger.info("output reducer type: " + outputReducerType);
// use a smaller page size that doesn't divide the row count evenly to exercise the paging logic better
ConfigHelper.setRangeBatchSize(getConf(), 99);
for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
{
String columnName = "text" + i;
Job job = new Job(getConf(), "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
if (outputReducerType.equalsIgnoreCase("filesystem"))
{
job.setCombinerClass(ReducerToFilesystem.class);
job.setReducerClass(ReducerToFilesystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
}
else
{
job.setReducerClass(ReducerToCassandra.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
job.getConfiguration().set(CONF_COLUMN_NAME, "sum");
}
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
if (i == 4)
{
IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, ByteBufferUtil.bytes(0));
ConfigHelper.setInputRange(job.getConfiguration(), Arrays.asList(expr));
}
if (i == 5)
{
// this will cause the predicate to be ignored in favor of scanning everything as a wide row
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true);
}
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
job.waitForCompletion(true);
}
return 0;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("TradeBuyOrdersHdfsDataVerifier.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
conf.set(RowOutputFormat.OUTPUT_URL, url);
Job job = Job.getInstance(conf, "TradeBuyOrdersHdfsDataVerifierV2");
job.setJobName("TradeBuyOrdersHdfsDataVerifierV2");
job.setInputFormatClass(RowInputFormat.class);
job.setOutputFormatClass(RowOutputFormat.class);
job.setMapperClass(HdfsDataMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(TradeBuyOrdersRow.class);
job.setReducerClass(HdfsDataReducer.class);
job.setOutputKeyClass(Key.class);
job.setOutputValueClass(TradeBuyOrdersOutputObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
return job.waitForCompletion(false) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MRSessionize <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MapReduce Sessionization");
job.setJarByClass(MRSessionize.class);
job.setMapperClass(SessionizeMapper.class);
job.setReducerClass(SessionizeReducer.class);
// WARNING: do NOT set the Combiner class
// from the same IP in one place before we can do sessionization
// Also, our reducer doesn't return the same key,value types it takes
// It can't be used on the result of a previous reducer
job.setMapOutputKeyClass(IpTimestampKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// We need these for secondary sorting.
// We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
// the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
// we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
// sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
// to the shuffle key (also called the "partition" key).
// So, to get some terminology straight.
// Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
// Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
// in our case.
// Together, the natural key and secondary sorting key form what we call the composite key. This key is called
// IpTimestampKey in our example.
// For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
// key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
// comparator that only uses the natural key part of the composite key to partition and group respectively (both
// happen during the shuffle phase).
// However, we have a different sort comparator which also gets used in the shuffle phase but determines how
// the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
// of the entire composite key.
// We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
// to be very helpful, if you'd like to read more on the subject.
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 13);
//2.设置MapReduce作业配置信息
String jobName = "FixedLengthInput2"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FixedLengthInput2.class); //指定运行时作业类
job.setJar("export\\FixedLengthInput2.jar"); //指定本地jar包
job.setMapperClass(FixedLengthInput2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(FixedLengthInput2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
job.setInputFormatClass(FixedLengthInputFormat.class); //设置输入格式化类
//3.设置作业输入和输出路径
String dataDir = "/expr/fixedinput/data"; //实验数据目录
String outputDir = "/expr/fixedinput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
long iReduceSleepTime, int iReduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
// Configure intermediate reduces
conf.setInt(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
iReduceStagesCount);
LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
for (int i = 1; i <= iReduceStagesCount; ++i) {
// Set reducer class for intermediate reduce
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.value.class"), IntWritable.class, Object.class);
conf.setInt(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduces"), numIReducer);
}
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(MRRSleepJob.class);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(MRRSleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置job配置信息
Job job = Job.getInstance(conf, "FlowSort");
job.setJarByClass(FlowSort.class);
job.setJar("export\\FlowSort.jar");
// Mapper
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(MySortKey.class);
job.setMapOutputValueClass(Text.class);
// Reducer
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(MySortKey.class);
// 作业输入输出路径
String dataDir = "/workspace/flowStatistics/output/part-r-00000"; // 实验数据目录
String outputDir = "/workspace/flowStatistics/output_sort"; // 实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 运行作业
System.out.println("Job: FlowSort is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Override
//Usage DBCountPageView [driverClass dburl]
public int run(String[] args) throws Exception {
String driverClassName = DRIVER_CLASS;
String url = DB_URL;
if(args.length > 1) {
driverClassName = args[0];
url = args[1];
}
initialize(driverClassName, url);
Configuration conf = getConf();
DBConfiguration.configureDB(conf, driverClassName, url);
Job job = new Job(conf);
job.setJobName("Count Pageviews of URLs");
job.setJarByClass(DBCountPageView.class);
job.setMapperClass(PageviewMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(PageviewReducer.class);
DBInputFormat.setInput(job, AccessRecord.class, "Access"
, null, "url", AccessFieldNames);
DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(PageviewRecord.class);
job.setOutputValueClass(NullWritable.class);
int ret;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
boolean correct = verify();
if(!correct) {
throw new RuntimeException("Evaluation was not correct!");
}
} finally {
shutdown();
}
return ret;
}