下面列出了org.apache.hadoop.mapreduce.Job#setMapOutputValueClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public static boolean runCalcJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(CalcMapReduce.Map.class);
job.setReducerClass(CalcMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(CalcMapReduce.TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true);
}
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
Class<? extends InputFormat<?,?>> inputFormatClass,
Class<? extends Mapper<?,?,?,?>> mapperClass,
Class<? extends OutputFormat<?,?>> outputFormatClass) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setInputFormatClass(inputFormatClass);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
boolean ret = job.waitForCompletion(true);
// Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
// LocalJobRuner
if (isHadoop1()) {
callOutputCommitter(job, outputFormatClass);
}
return ret;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "hello world");
job.setJarByClass(HelloWorld.class);
// Map related configuration
job.setInputFormatClass(DocumentInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
// Reduce related configuration
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(ContentOutputFormat.class);
job.setOutputKeyClass(DocumentURI.class);
job.setOutputValueClass(Text.class);
conf = job.getConfiguration();
conf.addResource("marklogic-hello-world.xml");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private Path doMapReduce(final String inputFile) throws Exception {
final FileSystem fileSystem = FileSystem.get(conf);
final Path inputPath = new Path(inputFile);
final Path outputPath = fileSystem.makeQualified(new Path("target/out"));
fileSystem.delete(outputPath, true);
final Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, inputPath);
conf.set(BAMTestNoHeaderOutputFormat.READ_HEADER_FROM_FILE, inputFile);
job.setInputFormatClass(BAMInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(SAMRecordWritable.class);
job.setOutputFormatClass(BAMTestNoHeaderOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(SAMRecordWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputPath);
final boolean success = job.waitForCompletion(true);
assertTrue(success);
return outputPath;
}
/**
* Sets up the actual job.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(String[] args) throws IOException {
Configuration conf = getConf();
String inputDirs = args[0];
String tabName = args[1];
conf.setStrings(TABLES_KEY, tabName);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job =
Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(MapReduceHFileSplitterJob.class);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
TableName tableName = TableName.valueOf(tabName);
job.setMapperClass(HFileCellMapper.class);
job.setReducerClass(CellSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
LOG.debug("success configuring load incremental job");
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
} else {
throw new IOException("No bulk output directory specified");
}
return job;
}
public static ScanMetrics runJob(org.apache.hadoop.conf.Configuration hadoopConf,
Class<? extends InputFormat> inputFormat, String jobName,
Class<? extends Mapper> mapperClass)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(hadoopConf);
//job.setJarByClass(HadoopScanMapper.class);
job.setJarByClass(mapperClass);
//job.setJobName(HadoopScanMapper.class.getSimpleName() + "[" + scanJob + "]");
job.setJobName(jobName);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
//job.setMapperClass(HadoopScanMapper.class);
job.setMapperClass(mapperClass);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(inputFormat);
boolean success = job.waitForCompletion(true);
if (!success) {
String f;
try {
// Just in case one of Job's methods throws an exception
f = String.format("MapReduce JobID %s terminated abnormally: %s",
job.getJobID().toString(), HadoopCompatLoader.DEFAULT_COMPAT.getJobFailureString(job));
} catch (RuntimeException e) {
f = "Job failed (unable to read job status programmatically -- see MapReduce logs for information)";
}
throw new IOException(f);
} else {
return DEFAULT_COMPAT.getMetrics(job.getCounters());
}
}
public int run(String[] args) throws Exception {
String jobName = "wla_baidu";
String inputPath = args[0];
String outputPath = args[1];
Path path = new Path(outputPath);
// 删除输出目录
path.getFileSystem(getConf()).delete(path, true);
// 1、把所有代码组织到类似于Topology的类中
Job job = Job.getInstance(getConf(), jobName);
// 2、一定要打包运行,必须写下面一行代码
job.setJarByClass(MR_WLA.class);
// 3、指定输入的hdfs
FileInputFormat.setInputPaths(job, inputPath);
// 4、指定map类
job.setMapperClass(WLA_Mapper.class);
// 5、指定map输出的<key,value>的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 6、指定reduce类
job.setReducerClass(WLA_Reducer.class);
// 7、指定reduce输出的<key,value>的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 8、指定输出的hdfs
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void run() throws IOException, ClassNotFoundException,
InterruptedException {
String inputPath = ItemBasedCFDriver.path.get("step5InputPath");
String outputPath = ItemBasedCFDriver.path.get("step5OutputPath");
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ":");
Job job = Job.getInstance(conf);
HDFS hdfs = new HDFS(conf);
hdfs.rmr(outputPath);
job.setMapperClass(Step5_Mapper.class);
job.setJarByClass(CalculateSimilarityStep5.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MedianAndStandardDeviationCommentLengthByHour <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
Job job = new Job(conf,
"StackOverflow Median and Standard Deviation Comment Length By Hour");
job.setJarByClass(MedianAndStandardDeviationCommentLengthByHour.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MedianStdDevMapper.class);
job.setCombinerClass(MedianStdDevCombiner.class);
job.setReducerClass(MedianStdDevReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(SortedMapWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MedianStdDevTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount-hbase <in> [<in>...] <table-name>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
TableName tableName = TableName.valueOf(otherArgs[otherArgs.length - 1]);
try {
CreateTable.createTable(tableName, conf,
Collections.singletonList(Bytes.toString(COLUMN_FAMILY)));
} catch (Exception e) {
LOG.error("Could not create the table.", e);
}
job.setJarByClass(WordCountHBase.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputValueClass(IntWritable.class);
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), MyTableReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CommonFriendStep2.class);
// 设置job的mapper类和reducer类
job.setMapperClass(CommonFansStep2Mapper.class);
job.setReducerClass(CommonFansStep2Reducer.class);
// 设置map阶段输出key:value数据的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置reudce阶段输出key:value数据的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 检测输出目录是否已存在,如果已存在则删除,以免在测试阶段需要反复手动删除输出目录
FileSystem fs = FileSystem.get(conf);
Path out = new Path(args[1]);
if(fs.exists(out)) {
fs.delete(out, true);
}
// 设置数据输入输出目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,out);
// 提交job到yarn或者local runner执行
job.waitForCompletion(true);
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
// job.setNumReduceTasks(0);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogBeanWritable.class);
job.setOutputValueClass(Text.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public void runJob(Job job, EntityStore entityStore) throws IOException, AccumuloSecurityException, ClassNotFoundException, InterruptedException, TableExistsException, AccumuloException, TableNotFoundException {
File dir = temporaryFolder.newFolder("input");
FileOutputStream fileOutputStream = new FileOutputStream(new File(dir,"uuids.txt"));
PrintWriter printWriter = new PrintWriter(fileOutputStream);
int countTotalResults = 1000;
try {
for (int i = 0; i < countTotalResults; i++) {
printWriter.println(i+"");
}
} finally {
printWriter.flush();
fileOutputStream.close();
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
fs.setWorkingDirectory(new Path(dir.getAbsolutePath()));
Path inputPath = fs.makeQualified(new Path(dir.getAbsolutePath())); // local path
EntityOutputFomat.setZooKeeperInstance(job, accumuloMiniClusterDriver.getClientConfiguration());
EntityOutputFomat.setConnectorInfo(job, PRINCIPAL, new PasswordToken(accumuloMiniClusterDriver.getRootPassword()));
job.setJarByClass(getClass());
job.setMapperClass(TestMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(EntityWritable.class);
job.setOutputFormatClass(EntityOutputFomat.class);
FileInputFormat.setInputPaths(job, inputPath);
job.submit();
job.waitForCompletion(true);
Node query = QueryBuilder.create().and().eq(KEY_1, VAL_1).end().build();
Iterable<Entity> itr = entityStore.query(Collections.singleton(TYPE), query, null, new Auths("A"));
List<Entity> queryResults = Lists.newArrayList(itr);
assertEquals(countTotalResults,queryResults.size());
}
protected int runHalvadeJob(Configuration halvadeConf, String tmpOutDir, int jobType) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
String pipeline = "";
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
HalvadeConf.setIsPass2(halvadeConf, true);
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = RNA_PASS2;
} else if(jobType == HalvadeResourceManager.DNA) {
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = DNA;
}
halvadeOpts.splitChromosomes(halvadeConf, 0);
HalvadeConf.setOutDir(halvadeConf, tmpOutDir);
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), halvadeConf);
if (outFs.exists(new Path(tmpOutDir))) {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
if(halvadeOpts.useBamInput)
setHeaderFile(halvadeOpts.in, halvadeConf);
if(halvadeOpts.rnaPipeline)
HalvadeConf.setPass2Suffix(halvadeConf, pass2suffix);
Job halvadeJob = Job.getInstance(halvadeConf, "Halvade" + pipeline);
halvadeJob.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
halvadeJob.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
addInputFiles(halvadeOpts.in, halvadeConf, halvadeJob);
FileOutputFormat.setOutputPath(halvadeJob, new Path(tmpOutDir));
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RnaGATKReducer.class);
} else if(jobType == HalvadeResourceManager.DNA){
halvadeJob.setMapperClass(halvadeOpts.alignmentTools[halvadeOpts.aln]);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.DnaGATKReducer.class);
}
halvadeJob.setMapOutputKeyClass(ChromosomeRegion.class);
halvadeJob.setMapOutputValueClass(SAMRecordWritable.class);
halvadeJob.setInputFormatClass(HalvadeTextInputFormat.class);
halvadeJob.setOutputKeyClass(Text.class);
if(halvadeOpts.mergeBam) {
halvadeJob.setSortComparatorClass(SimpleChrRegionComparator.class);
halvadeJob.setOutputValueClass(SAMRecordWritable.class);
}else {
halvadeJob.setPartitionerClass(ChrRgPartitioner.class);
halvadeJob.setSortComparatorClass(ChrRgSortComparator.class);
halvadeJob.setGroupingComparatorClass(ChrRgGroupingComparator.class);
halvadeJob.setOutputValueClass(VariantContextWritable.class);
}
if(halvadeOpts.justAlign && !halvadeOpts.mergeBam)
halvadeJob.setNumReduceTasks(0);
else if (halvadeOpts.mergeBam) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.BamMergeReducer.class);
halvadeJob.setNumReduceTasks(1);
} else {
halvadeJob.setNumReduceTasks(halvadeOpts.reduces);
if(halvadeOpts.countOnly) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.CountReadsReducer.class);
halvadeJob.setOutputValueClass(LongWritable.class);
}
}
if(halvadeOpts.useBamInput) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.AlignedBamMapper.class);
halvadeJob.setInputFormatClass(BAMInputFormat.class);
}
return runTimedJob(halvadeJob, "Halvade Job");
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 5) {
throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <edge?>");
}
String user = args[0];
String pass = args[1];
String table = args[2];
String instanceName = args[3];
setConf(new Configuration());
// getConf().set("mapred.job.tracker", "local");
getConf().set("fs.default.name", "local");
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration().setInstanceName(instanceName).setUser(user).setPassword(pass.getBytes())
.setGraphName(table).setInstanceType(InstanceType.Mock).setCreate(true);
if (Boolean.parseBoolean(args[4])) {
job.setInputFormatClass(EdgeInputFormat.class);
EdgeInputFormat.setAccumuloGraphConfiguration(job, cfg);
job.setMapperClass(TestEdgeMapper.class);
} else {
job.setInputFormatClass(VertexInputFormat.class);
VertexInputFormat.setAccumuloGraphConfiguration(job, cfg);
job.setMapperClass(TestVertexMapper.class);
}
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MRSessionize <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MapReduce Sessionization");
job.setJarByClass(MRSessionize.class);
job.setMapperClass(SessionizeMapper.class);
job.setReducerClass(SessionizeReducer.class);
// WARNING: do NOT set the Combiner class
// from the same IP in one place before we can do sessionization
// Also, our reducer doesn't return the same key,value types it takes
// It can't be used on the result of a previous reducer
job.setMapOutputKeyClass(IpTimestampKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// We need these for secondary sorting.
// We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
// the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
// we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
// sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
// to the shuffle key (also called the "partition" key).
// So, to get some terminology straight.
// Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
// Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
// in our case.
// Together, the natural key and secondary sorting key form what we call the composite key. This key is called
// IpTimestampKey in our example.
// For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
// key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
// comparator that only uses the natural key part of the composite key to partition and group respectively (both
// happen during the shuffle phase).
// However, we have a different sort comparator which also gets used in the shuffle phase but determines how
// the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
// of the entire composite key.
// We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
// to be very helpful, if you'd like to read more on the subject.
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
protected int run(CommandLine cmd) throws Exception {
if (!cmd.getArgList().isEmpty()) throw new HalyardExport.ExportException("Unknown arguments: " + cmd.getArgList().toString());
String source = cmd.getOptionValue('s');
String queryFiles = cmd.getOptionValue('q');
String target = cmd.getOptionValue('t');
if (!target.contains("{0}")) {
throw new HalyardExport.ExportException("Bulk export target must contain '{0}' to be replaced by stripped filename of the actual SPARQL query.");
}
getConf().set(SOURCE, source);
getConf().set(TARGET, target);
String driver = cmd.getOptionValue('c');
if (driver != null) {
getConf().set(JDBC_DRIVER, driver);
}
String props[] = cmd.getOptionValues('p');
if (props != null) {
for (int i=0; i<props.length; i++) {
props[i] = Base64.encodeBase64String(props[i].getBytes(StandardCharsets.UTF_8));
}
getConf().setStrings(JDBC_PROPERTIES, props);
}
if (cmd.hasOption('i')) getConf().set(HalyardBulkUpdate.ELASTIC_INDEX_URL, cmd.getOptionValue('i'));
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
String cp = cmd.getOptionValue('l');
if (cp != null) {
String jars[] = cp.split(":");
StringBuilder newCp = new StringBuilder();
for (int i=0; i<jars.length; i++) {
if (i > 0) newCp.append(':');
newCp.append(addTmpFile(jars[i])); //append clappspath entris to tmpfiles and trim paths from the classpath
}
getConf().set(JDBC_CLASSPATH, newCp.toString());
}
Job job = Job.getInstance(getConf(), "HalyardBulkExport " + source + " -> " + target);
job.setJarByClass(HalyardBulkExport.class);
job.setMaxMapAttempts(1);
job.setMapperClass(BulkExportMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Void.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(QueryInputFormat.class);
QueryInputFormat.setQueriesFromDirRecursive(job.getConfiguration(), queryFiles, false, 0);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initCredentials(job);
if (job.waitForCompletion(true)) {
LOG.info("Bulk Export Completed..");
return 0;
}
return -1;
}
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
StringBuilder sb = new StringBuilder();
for (int j = 2; j < otherArgs.length; j++) {
sb.append(otherArgs[j]);
}
LOGGER.debug("Arguments[ " + otherArgs.length+"]"+"and values respectively ["+otherArgs[0]+"], "+
otherArgs[1]+", ["+otherArgs[2]+"]"+", ["+otherArgs[3]+"],"+
otherArgs[4]);
String inputpath = otherArgs[0];
String outputpath = "/tmp/jumbune/dvjsonreport"+ new Date().getTime();
String json = otherArgs[1];
String nullCondition = otherArgs[2];
String regex = otherArgs[3];
String dvDir = otherArgs[4];
if(regex.isEmpty()){
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, regex);
}
if(nullCondition.isEmpty()){
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, nullCondition);
}
conf.set(JsonDataVaildationConstants.SLAVE_DIR, dvDir);
conf.set(JsonDataVaildationConstants.JSON_ARGUMENT, json);
FileSystem fs = FileSystem.get(conf);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "JSONDataValidation");
job.setJarByClass(JsonDataValidationExecutor.class);
job.setInputFormatClass(JsonFileInputFormat.class);
job.setMapperClass(JsonDataValidationMapper.class);
job.setPartitionerClass(JsonDataValidationPartitioner.class);
job.setReducerClass(JsonDataValidationReducer.class);
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FileKeyViolationBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TotalReducerViolationBean.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Path[] inputPaths = FileUtil.getAllJsonNestedFilePath(job, inputpath);
FileInputFormat.setInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, new Path(outputpath));
if(fs.exists(new Path(outputpath)))
{
fs.delete(new Path(outputpath), true);
}
job.waitForCompletion(true);
Map<String, JsonViolationReport> jsonMap = readDataFromHdfs(conf,outputpath);
final Gson gson= new Gson();
final String jsonReport = gson.toJson(jsonMap);
LOGGER.info("Completed DataValidation");
LOGGER.info(JsonDataVaildationConstants.JSON_DV_REPORT + jsonReport);
}