下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public void testAddInputPathWithMapper() throws IOException {
final Job conf = Job.getInstance();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, KeyValueMapClass.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
}
@SuppressWarnings("unchecked")
public void testAddInputPathWithMapper() throws IOException {
final Job conf = Job.getInstance();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, KeyValueMapClass.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar")));
}
public static boolean runCalcJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(CalcMapReduce.Map.class);
job.setReducerClass(CalcMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(CalcMapReduce.TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true);
}
public static void runSortJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(SortMapReduce.Map.class);
job.setReducerClass(SortMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Person.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
public static void runJob(Configuration conf,
Path inputPath,
Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(UniqueHashedKeyJob.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
@SuppressWarnings("unchecked")
public void testAddInputPathWithFormat() throws IOException {
final Job conf = Job.getInstance();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
@SuppressWarnings("unchecked")
public void testAddInputPathWithFormat() throws IOException {
final Job conf = Job.getInstance();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
@Test (expected = SparkException.class)
public void readInputFormatMismatchTranslator() throws Exception {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(FileSystemInput.FORMAT_CONFIG, "input-format");
paramMap.put(FileSystemInput.PATH_CONFIG, FileSystemInput.class.getResource(CSV_DATA).getPath());
paramMap.put(FileSystemInput.INPUT_FORMAT_TYPE_CONFIG, KeyValueTextInputFormat.class.getCanonicalName());
paramMap.put("translator.type", DummyInputFormatTranslator.class.getCanonicalName());
config = ConfigFactory.parseMap(paramMap);
FileSystemInput formatInput = new FileSystemInput();
assertNoValidationFailures(formatInput, config);
formatInput.configure(config);
formatInput.read().show();
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(XmlMapReduceWriter.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static double calcPageRank(Path inputPath, Path outputPath, int numNodes)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(Reduce.CONF_NUM_NODES_GRAPH, numNodes);
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
long summedConvergence = job.getCounters().findCounter(
Reduce.Counter.CONV_DELTAS).getValue();
double convergence =
((double) summedConvergence /
Reduce.CONVERGENCE_SCALING_FACTOR) /
(double) numNodes;
System.out.println("======================================");
System.out.println("= Num nodes: " + numNodes);
System.out.println("= Summed convergence: " + summedConvergence);
System.out.println("= Convergence: " + convergence);
System.out.println("======================================");
return convergence;
}
public static void runJob(Configuration conf,
Path userLogsPath,
Path usersPath,
Path outputPath)
throws Exception {
FileSystem fs = usersPath.getFileSystem(conf);
FileStatus usersStatus = fs.getFileStatus(usersPath);
if (usersStatus.isDir()) {
for (FileStatus f : fs.listStatus(usersPath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(usersPath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(FinalJoinJob.class);
job.setMapperClass(GenericReplicatedJoin.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
public static void runJob(Configuration conf,
Path usersPath,
Path uniqueUsersPath,
Path outputPath)
throws Exception {
FileSystem fs = uniqueUsersPath.getFileSystem(conf);
FileStatus uniqueUserStatus = fs.getFileStatus(uniqueUsersPath);
if (uniqueUserStatus.isDir()) {
for (FileStatus f : fs.listStatus(uniqueUsersPath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(uniqueUsersPath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(ReplicatedFilterJob.class);
job.setMapperClass(ReplicatedFilterJob.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, usersPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
public static void runJob(Path inputPath,
Path smallFilePath,
Path outputPath)
throws Exception {
Configuration conf = new Configuration();
FileSystem fs = smallFilePath.getFileSystem(conf);
FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);
if (smallFilePathStatus.isDir()) {
for (FileStatus f : fs.listStatus(smallFilePath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(GenericReplicatedJoin.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setNumReduceTasks(0);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(CompositeJoin.class);
job.setMapperClass(JoinMap.class);
job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.class, usersPath, userLogsPath)
);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public Job runJob(Configuration conf, Path inputPath, Path outputPath)
throws ClassNotFoundException, IOException, InterruptedException {
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(false);
return job;
}
public Job runJob(Configuration conf, Path inputPath, Path outputPath)
throws ClassNotFoundException, IOException, InterruptedException {
Job job = new Job(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(false);
return job;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":"); //设置输入文件kv分隔符
//2.设置MapReduce作业配置信息
String jobName = "KeyValueInput"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(KeyValueInput.class); //指定运行时作业类
job.setJar("export\\KeyValueInput.jar"); //指定本地jar包
job.setMapperClass(KeyValueInputMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(KeyValueInputReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
job.setInputFormatClass(KeyValueTextInputFormat.class); //设置输入格式化类
//3.设置作业输入和输出路径
String dataDir = "/expr/kvinput/data"; //实验数据目录
String outputDir = "/expr/kvinput/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Test
public void testDoMultipleInputs() throws IOException {
Path in1Dir = getDir(IN1_DIR);
Path in2Dir = getDir(IN2_DIR);
Path outDir = getDir(OUT_DIR);
Configuration conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(outDir, true);
DataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
file1.writeBytes("a\nb\nc\nd\ne");
file1.close();
// write tab delimited to second file because we're doing
// KeyValueInputFormat
DataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
file2.close();
Job job = Job.getInstance(conf);
job.setJobName("mi");
MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class,
KeyValueMapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(ReducerClass.class);
FileOutputFormat.setOutputPath(job, outDir);
boolean success = false;
try {
success = job.waitForCompletion(true);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ClassNotFoundException instante) {
throw new RuntimeException(instante);
}
if (!success)
throw new RuntimeException("Job failed!");
// copy bytes a bunch of times for the ease of readLine() - whatever
BufferedReader output = new BufferedReader(new InputStreamReader(fs
.open(new Path(outDir, "part-r-00000"))));
// reducer should have counted one key from each file
assertTrue(output.readLine().equals("a 2"));
assertTrue(output.readLine().equals("b 2"));
assertTrue(output.readLine().equals("c 2"));
assertTrue(output.readLine().equals("d 2"));
assertTrue(output.readLine().equals("e 2"));
}
@Test
public void testDoMultipleInputs() throws IOException {
Path in1Dir = getDir(IN1_DIR);
Path in2Dir = getDir(IN2_DIR);
Path outDir = getDir(OUT_DIR);
Configuration conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
fs.delete(outDir, true);
DataOutputStream file1 = fs.create(new Path(in1Dir, "part-0"));
file1.writeBytes("a\nb\nc\nd\ne");
file1.close();
// write tab delimited to second file because we're doing
// KeyValueInputFormat
DataOutputStream file2 = fs.create(new Path(in2Dir, "part-0"));
file2.writeBytes("a\tblah\nb\tblah\nc\tblah\nd\tblah\ne\tblah");
file2.close();
Job job = Job.getInstance(conf);
job.setJobName("mi");
MultipleInputs.addInputPath(job, in1Dir, TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(job, in2Dir, KeyValueTextInputFormat.class,
KeyValueMapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(ReducerClass.class);
FileOutputFormat.setOutputPath(job, outDir);
boolean success = false;
try {
success = job.waitForCompletion(true);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ClassNotFoundException instante) {
throw new RuntimeException(instante);
}
if (!success)
throw new RuntimeException("Job failed!");
// copy bytes a bunch of times for the ease of readLine() - whatever
BufferedReader output = new BufferedReader(new InputStreamReader(fs
.open(new Path(outDir, "part-r-00000"))));
// reducer should have counted one key from each file
assertTrue(output.readLine().equals("a 2"));
assertTrue(output.readLine().equals("b 2"));
assertTrue(output.readLine().equals("c 2"));
assertTrue(output.readLine().equals("d 2"));
assertTrue(output.readLine().equals("e 2"));
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
String[] dicColsArr = null;
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT);
options.addOption(OPTION_GLOBAL_DIC_PART_REDUCE_STATS);
parseOptions(options, args);
KylinConfig config = KylinConfig.getInstanceFromEnv();
dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns();
String cubeName = getOptionValue(OPTION_CUBE_NAME);
String segmentID = getOptionValue(OPTION_SEGMENT_ID);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
logger.info("Starting: " + job.getJobName());
// ----------------------------------------------------------------------------
// add metadata to distributed cache
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set("partition.statistics.path", getOptionValue(OPTION_GLOBAL_DIC_PART_REDUCE_STATS));
job.getConfiguration().set("last.max.dic.value.path", getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT));
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
job.setJarByClass(BuildGlobalHiveDictTotalBuildJob.class);
setJobClasspath(job, cube.getConfig());
// Mapper
job.setMapperClass(BuildGlobalHiveDictTotalBuildMapper.class);
// Input Output
setInput(job, getOptionValue(OPTION_INPUT_PATH));
setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
job.setNumReduceTasks(0);//no reduce
job.setInputFormatClass(KeyValueTextInputFormat.class);
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// delete output
Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
deletePath(job.getConfiguration(), baseOutputPath);
attachSegmentMetadataWithDict(segment, job.getConfiguration());
return waitForCompletion(job);
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
String[] dicColsArr = null;
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
parseOptions(options, args);
KylinConfig config = KylinConfig.getInstanceFromEnv();
dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns();
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
// add metadata to distributed cache
String cubeName = getOptionValue(OPTION_CUBE_NAME);
String segmentID = getOptionValue(OPTION_SEGMENT_ID);
CubeManager cubeMgr = CubeManager.getInstance(config);
CubeInstance cube = cubeMgr.getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
logger.info("Starting: " + job.getJobName());
job.setJarByClass(BuildGlobalHiveDictPartBuildJob.class);
setJobClasspath(job, cube.getConfig());
//FileInputFormat.setInputPaths(job, input);
setInput(job, dicColsArr, getInputPath(config, segment));
// make each reducer output to respective dir
setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
//set reduce num
setReduceNum(job, config);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class);
job.setPartitionerClass(BuildGlobalHiveDictPartPartitioner.class);
job.setReducerClass(BuildGlobalHiveDictPartBuildReducer.class);
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// delete output
Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
deletePath(job.getConfiguration(), baseOutputPath);
attachSegmentMetadataWithDict(segment, job.getConfiguration());
return waitForCompletion(job);
} finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
}
public static boolean findShortestPath(Configuration conf, Path inputPath,
Path outputPath, String startNode,
String targetNode)
throws Exception {
conf.set(TARGET_NODE, targetNode);
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
Counter counter = job.getCounters()
.findCounter(Reduce.PathCounter.TARGET_NODE_DISTANCE_COMPUTED);
if (counter != null && counter.getValue() > 0) {
CounterGroup group = job.getCounters().getGroup(Reduce.PathCounter.PATH.toString());
Iterator<Counter> iter = group.iterator();
iter.hasNext();
String path = iter.next().getName();
System.out.println("==========================================");
System.out.println("= Shortest path found, details as follows.");
System.out.println("= ");
System.out.println("= Start node: " + startNode);
System.out.println("= End node: " + targetNode);
System.out.println("= Hops: " + counter.getValue());
System.out.println("= Path: " + path);
System.out.println("==========================================");
return true;
}
return false;
}
public static void runJob(String inputPath,
Path outputPath,
Path bloomFilterPath)
throws Exception {
Configuration conf = new Configuration();
DistributedCache.addCacheFile(bloomFilterPath.toUri(), conf);
Job job = new Job(conf);
job.setJarByClass(BloomJoin.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setNumReduceTasks(0);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(PersonSortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(PersonBinaryComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(CloneReduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(PersonBinaryComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(SampleMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}