下面列出了org.apache.hadoop.mapreduce.Job#waitForCompletion ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Tests one of the mappers throwing exception.
*
* @throws Exception
*/
public void testChainFail() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
job.setJobName("chain");
ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
ChainMapper.addMapper(job, FailMap.class, LongWritable.class, Text.class,
IntWritable.class, Text.class, null);
ChainMapper.addMapper(job, Mapper.class, IntWritable.class, Text.class,
LongWritable.class, Text.class, null);
job.waitForCompletion(true);
assertTrue("Job Not failed", !job.isSuccessful());
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf=new Configuration();
// The test input for which you want to find the acitivity that the Person should be doing
conf.set("test_input", args[0]);
Job job = new Job(conf);
job.setJarByClass(NBCDriver.class);
job.setJobName("Naive_Bayes_calssifier using Hadoop");
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setMapperClass(NBCMap.class);
job.setReducerClass(NBCReduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
@SuppressWarnings("deprecation")
public static boolean runWithConf (String[] args, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setJarByClass(lud_driver.class);
job.setJobName("Split a matrix into it's LU decomposed components using the Naive Gaussian Elimination method");
long n = conf.getLong("n", 0);
FileInputFormat.setInputPaths(job, new Path((n==0)?args[0]:(args[1]+"-run-"+(n-1))));
FileOutputFormat.setOutputPath(job, new Path(args[1]+"-run-"+n));
job.setNumReduceTasks(0);
job.setMapperClass(lud_mapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
return success;
}
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
job.setJarByClass(merge_results_driver.class);
job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");
FileOutputFormat.setOutputPath(job, new Path(out_path));
job.setMapperClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_mapper.class);
job.setReducerClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_reducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TextPairPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(TextPairComparator.class);
boolean success = job.waitForCompletion(true);
return success;
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Average <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverageMapper.class);
job.setCombinerClass(AverageReducer.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(CountAverageTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static int run(String collection, Path input, Path output,
Float regularizationFactor, Boolean addIntercept,
Configuration baseConf) throws IOException, ClassNotFoundException,
InterruptedException {
Configuration conf = new Configuration(baseConf);
if (null != addIntercept) {
conf.setBoolean("lr.iteration.add.intercept", addIntercept);
}
if (null != regularizationFactor) {
conf.setDouble("lr.iteration.regulariztion.factor",
regularizationFactor);
}
conf.set("com.b5m.laser.msgpack.output.method", "update_online_model");
Job job = Job.getInstance(conf);
job.setJarByClass(LrIterationDriver.class);
job.setJobName("logistic regression");
FileInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(MsgpackOutputFormat.class);
job.setOutputKeyClass(String.class);
job.setOutputValueClass(LaserOnlineModel.class);
LrIterationInputFormat.setNumMapTasks(job, 120);
job.setInputFormatClass(LrIterationInputFormat.class);
job.setMapperClass(LrIterationMapper.class);
job.setNumReduceTasks(0);
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job:logistic regression, Failed!");
}
return 0;
}
@Override
public int run(String[] args)
throws Exception
{
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf);
job.setJarByClass(TextToSentencesSplitter.class);
job.setJobName(TextToSentencesSplitter.class.getName());
// mapper
job.setMapperClass(TextToSentencesSplitter.MapperClass.class);
job.setInputFormatClass(WARCInputFormat.class);
// reducer
job.setReducerClass(ReducerClass.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
// paths
String commaSeparatedInputFiles = otherArgs[0];
String outputPath = otherArgs[1];
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException(
"Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
}
String table = args[0];
assertionErrors.put(table, new AssertionError("Dummy"));
assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
getConf().set("MRTester_tableName", table);
Job job = Job.getInstance(getConf());
job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(ChunkInputFormat.class);
ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
ChunkInputFormat.setInputTableName(job, table);
ChunkInputFormat.setScanAuthorizations(job, AUTHS);
@SuppressWarnings("unchecked")
Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
.forName(args[1]);
job.setMapperClass(forName);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
// 设置本地提交job,集群运行,需要代码
// File jarFile = EJob.createTempJar("target/classes");
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// 设置本地提交job,集群运行,需要代码结束
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置reducer配置
// 1. 集群上运行,打成jar运行(要求addDependencyJars参数为true,默认就是true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
// 2. 本地运行,要求参数addDependencyJars为false
TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null, null, null, false);
job.setNumReduceTasks(0);
// 设置输入路径
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "wordcountcounters");
job.setJarByClass(WordCountCounters.class);
job.setMapperClass(SumMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.Murmur3Partitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setSlice_range(
new SliceRange().
setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER).
setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER).
setCount(100));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
// config.set("ip.file.path", args[2]);
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
//设置reduce个数为0
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration(); //Hadoop配置类
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true"); //集群交叉提交
/* conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", namenode_ip + ":9001");
conf.set("yarn.resourcemanager.hostname", namenode_ip);
conf.set("yarn.resourcemanager.resource-tracker.address", namenode_ip + ":8031");
conf.set("yarn.resourcemtanager.address", namenode_ip + ":8032");
conf.set("yarn.resourcemanager.admin.address", namenode_ip + ":8033");
conf.set("yarn.resourcemanager.scheduler.address", namenode_ip + ":8034");
conf.set("mapreduce.jobhistory.address", namenode_ip + ":10020"); */
//2.设置MapReduce作业配置信息
String jobName = "WordCount"; //定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(WordCount.class); //指定作业类
job.setJar("export\\WordCount.jar"); //指定本地jar包
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); //指定Combiner类
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//3.设置作业输入和输出路径
String dataDir = "/expr/wordcount/data"; //实验数据目录
String outputDir = "/expr/wordcount/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
//如果输出目录已存在则删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// LOGGER.debug("Data validation job received args length [ " +
// otherArgs.length + "]");
StringBuilder sb = new StringBuilder();
for (int j = 0; j < otherArgs.length; j++) {
sb.append(otherArgs[j]);
}
String validationInfoJson = sb.toString();
Gson gson = new Gson();
DataSourceCompValidationInfo validationInfo = gson.fromJson(validationInfoJson,
DataSourceCompValidationInfo.class);
DataSourceCompJobExecutor dscJobExecutor = new DataSourceCompJobExecutor();
dscJobExecutor.removeSlash(validationInfo);
dscJobExecutor.addTransformationNumber(validationInfo);
DataSourceCompMapperInfo mapperInfo = dscJobExecutor.createMapperInfo(validationInfo);
String outputPath = DataSourceCompConstants.OUTPUT_DIR_PATH + new Date().getTime();
// String outputPath = "/destination";
conf.set("validationInfoJson", gson.toJson(validationInfo));
conf.set("mapperInfoJson", gson.toJson(mapperInfo));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fileSystem = FileSystem.get(conf);
List<Path> mapperFilesList = dscJobExecutor.getFiles(validationInfo.getSourcePath(), fileSystem);
mapperFilesList.addAll(dscJobExecutor.getFiles(validationInfo.getDestinationPath(), fileSystem));
Map<String, String> filesMap = dscJobExecutor.encodeFilesMap(mapperFilesList);
Map<String, String> reverseFilesMap = dscJobExecutor.invertMap(filesMap);
Path[] patharr = new Path[mapperFilesList.size()];
for (int i = 0; i < mapperFilesList.size(); i++) {
patharr[i] = mapperFilesList.get(i);
}
conf.set("filesMap", gson.toJson(filesMap));
String recordSeparator = validationInfo.getRecordSeparator();
if (recordSeparator == null || recordSeparator.trim().isEmpty()) {
recordSeparator = "\n";
}
conf.set("textinputformat.record.delimiter", recordSeparator);
Job job = Job.getInstance(conf, "jumbune_dsc_" + validationInfo.getJobName());
job.setJarByClass(DataSourceCompJobExecutor.class);
job.setMapperClass(org.jumbune.datavalidation.dsc.DataSourceCompMapper.class);
job.setReducerClass(DataSourceCompReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataSourceCompMapValueWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, patharr);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
MultipleOutputs.setCountersEnabled(job, true);
job.waitForCompletion(true);
String workerDirPath = validationInfo.getSlaveFileLoc();
dscJobExecutor.copyResult(conf, outputPath, workerDirPath);
dscJobExecutor.renameFiles(workerDirPath, reverseFilesMap);
DataSourceCompReportBean reportBean = dscJobExecutor.calculateCounters(job, outputPath, reverseFilesMap,
validationInfo.getValidationsList());
LOGGER.info(DataValidationConstants.DV_REPORT + gson.toJson(reportBean));
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DateSort2"; //定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateSort2.class); //指定作业类
job.setJar("export\\DateSort2.jar"); //指定本地jar包
// Map
job.setMapperClass(DateSort2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(Text.class); //设置Mapper输出Value类型
// Reduce
job.setReducerClass(DateSort2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// 自定义Sort
job.setSortComparatorClass(MySort.class); //设置自定义排序类
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/output/part-r-00000"; //实验数据目录
String outputDir = "/expr/datecount/output_sort2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Only have one reduce task so that all of the results from mapping are
// processed in one place.
job.setNumReduceTasks(1);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Job topJob = Job.getInstance(getConf(), "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(TextOutputFormat.class);
topJob.setOutputKeyClass(Text.class);
topJob.setOutputValueClass(IntWritable.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}
public void runJob(Job job, EventStore eventStore) throws IOException, AccumuloSecurityException, ClassNotFoundException, InterruptedException, TableExistsException, AccumuloException, TableNotFoundException {
File dir = temporaryFolder.newFolder("input");
FileOutputStream fileOutputStream = new FileOutputStream(new File(dir,"uuids.txt"));
PrintWriter printWriter = new PrintWriter(fileOutputStream);
int countTotalResults = 100;
try {
for (int i = 0; i < countTotalResults; i++) {
printWriter.println(""+i);
}
} finally {
printWriter.flush();
fileOutputStream.close();
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
fs.setWorkingDirectory(new Path(dir.getAbsolutePath()));
Path inputPath = fs.makeQualified(new Path(dir.getAbsolutePath())); // local path
EventOutputFormat.setZooKeeperInstance(job, accumuloMiniClusterDriver.getClientConfiguration());
EventOutputFormat.setConnectorInfo(job, PRINCIPAL, new PasswordToken(accumuloMiniClusterDriver.getRootPassword()));
job.setJarByClass(getClass());
job.setMapperClass(TestMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(EventWritable.class);
job.setOutputFormatClass(EventOutputFormat.class);
FileInputFormat.setInputPaths(job, inputPath);
job.submit();
job.waitForCompletion(true);
Iterable<Event> itr = eventStore.query(new Date(currentTimeMillis() - 25000),
new Date(), Collections.singleton(TYPE), QueryBuilder.create().and().eq(KEY_1, VAL_1).end().build(), null, DEFAULT_AUTHS);
List<Event> queryResults = Lists.newArrayList(itr);
assertEquals(countTotalResults,queryResults.size());
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Use GenericOptionsParse, supporting -D -conf etc.
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage: wordcount <out>");
System.exit(2);
}
String output = otherArgs[0];
System.out.println("Running framework: " + conf.get("mapreduce.framework.name"));
System.out.println("File system: " + conf.get("fs.default.name"));
final FileSystem fs = FileSystem.get(conf);
if (conf.getBoolean("cleanup-output", true)) {
fs.delete(new Path(output), true);
}
conf.set("mapreduce.task.profile.reduces", "1"); // no reduces
Job job = new Job(conf, "CodeLab-TalosMessageCount");
job.setJarByClass(TalosMessageCount.class);
// setInputFormat related;
job.setInputFormatClass(TalosTopicInputFormat.class);
// set mapper related;
job.setMapperClass(TalosMessageCountMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// set reducer related;
job.setReducerClass(TalosMessageCountReducer.class);
// set outputFormat related;
FileOutputFormat.setOutputPath(job, new Path(output));
// job.setOutputFormatClass(FileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
try {
job.waitForCompletion(true);
} catch (NullPointerException e) {
e.printStackTrace(System.out);
e.printStackTrace();
}
System.out.println("job finished");
}
@Override
public int run(String[] args) throws Exception {
setConf(new Configuration());
getConf().set("fs.default.name", "local");
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration().setInstanceName("_mapreduce_instance2").setUser("root").setPassword("".getBytes())
.setGraphName("_mapreduce_table_2").setInstanceType(InstanceType.Mock).setCreate(true);
job.setInputFormatClass(VertexInputFormat.class);
VertexInputFormat.setAccumuloGraphConfiguration(job, cfg);
ElementOutputFormat.setAccumuloGraphConfiguration(job, cfg);
job.setMapperClass(TestVertexMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Element.class);
job.setOutputFormatClass(ElementOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}