下面列出了怎么用org.apache.hadoop.mapred.TextOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
Class<? extends Reducer> reduceClass) throws Exception {
JobConf conf = new JobConf(PersonVersion.class);
conf.setJobName(jobName);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(mapClass);
conf.setCombinerClass(reduceClass);
conf.setReducerClass(reduceClass);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
JobClient.runJob(conf);
}
TextWriterProxy(Configuration config, String fileName) throws IOException{
fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);
columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS,null);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
String attempt = "attempt_"+dateFormat.format(new Date())+"_0001_m_000000_0";
Path outputPath = new Path(fileName);
//todo 需要进一步确定TASK_ATTEMPT_ID
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat outFormat = new TextOutputFormat();
outFormat.setOutputPath(conf, outputPath);
outFormat.setWorkOutputPath(conf, outputPath);
if(null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
}
/**
* Sets up a job conf for the given job using the given config object. Ensures
* that the correct input format is set, the mapper and and reducer class and
* the input and output keys and value classes along with any other job
* configuration.
*
* @param config
* @return JobConf representing the job to be ran
* @throws IOException
*/
private JobConf getJob(ConfigExtractor config) throws IOException {
JobConf job = new JobConf(config.getConfig(), SliveTest.class);
job.setInputFormat(DummyInputFormat.class);
FileOutputFormat.setOutputPath(job, config.getOutputPath());
job.setMapperClass(SliveMapper.class);
job.setPartitionerClass(SlivePartitioner.class);
job.setReducerClass(SliveReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, false);
job.setNumReduceTasks(config.getReducerAmount());
job.setNumMapTasks(config.getMapAmount());
return job;
}
/**
* Sets up a job conf for the given job using the given config object. Ensures
* that the correct input format is set, the mapper and and reducer class and
* the input and output keys and value classes along with any other job
* configuration.
*
* @param config
* @return JobConf representing the job to be ran
* @throws IOException
*/
private JobConf getJob(ConfigExtractor config) throws IOException {
JobConf job = new JobConf(config.getConfig(), SliveTest.class);
job.setInputFormat(DummyInputFormat.class);
FileOutputFormat.setOutputPath(job, config.getOutputPath());
job.setMapperClass(SliveMapper.class);
job.setPartitionerClass(SlivePartitioner.class);
job.setReducerClass(SliveReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, false);
job.setNumReduceTasks(config.getReducerAmount());
job.setNumMapTasks(config.getMapAmount());
return job;
}
public void run(String[] args) throws Exception
{
JobConf conf = new JobConf(this.getClass());
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
public int run(final String[] args) throws Exception {
log.info("run starting");
final Configuration conf = getConf();
JobConf job = new JobConf(conf, WordCountInput.class);
job.setJobName("AerospikeWordCountInput");
job.setInputFormat(AerospikeInputFormat.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
JobClient.runJob(job);
log.info("finished");
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: CartesianCommentComparison <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
// Configure the join type
JobConf conf = new JobConf("Cartesian Product");
conf.setJarByClass(CartesianCommentComparison.class);
conf.setMapperClass(CartesianMapper.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(CartesianInputFormat.class);
// Configure the input format
CartesianInputFormat.setLeftInputInfo(conf, TextInputFormat.class, args[0]);
CartesianInputFormat.setRightInputInfo(conf, TextInputFormat.class, args[0]);
TextOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
public static Table createUnpartitionedTable(
HiveMetaStoreClient metaStoreClient,
String database,
String table,
URI location)
throws TException {
Table hiveTable = new Table();
hiveTable.setDbName(database);
hiveTable.setTableName(table);
hiveTable.setTableType(TableType.EXTERNAL_TABLE.name());
hiveTable.putToParameters("EXTERNAL", "TRUE");
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(DATA_COLUMNS);
sd.setLocation(location.toString());
sd.setParameters(new HashMap<String, String>());
sd.setInputFormat(TextInputFormat.class.getName());
sd.setOutputFormat(TextOutputFormat.class.getName());
sd.setSerdeInfo(new SerDeInfo());
sd.getSerdeInfo().setSerializationLib("org.apache.hadoop.hive.serde2.OpenCSVSerde");
hiveTable.setSd(sd);
metaStoreClient.createTable(hiveTable);
ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, database, table);
ColumnStatisticsData statsData = new ColumnStatisticsData(_Fields.LONG_STATS, new LongColumnStatsData(1L, 2L));
ColumnStatisticsObj cso1 = new ColumnStatisticsObj("id", "bigint", statsData);
List<ColumnStatisticsObj> statsObj = Collections.singletonList(cso1);
metaStoreClient.updateTableColumnStatistics(new ColumnStatistics(statsDesc, statsObj));
return hiveTable;
}
public static Table createPartitionedTable(
HiveMetaStoreClient metaStoreClient,
String database,
String table,
URI location)
throws Exception {
return createPartitionedTable(metaStoreClient, database, table, location, DATA_COLUMNS, PARTITION_COLUMNS,
"org.apache.hadoop.hive.serde2.OpenCSVSerde", TextInputFormat.class.getName(),
TextOutputFormat.class.getName());
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
public int run(String[] args) throws Exception
{
// Create a configuration
Configuration conf = getConf();
// Create a job from the default configuration that will use the WordCount class
JobConf job = new JobConf(conf, LogCountsPerHour.class);
// Define our input path as the first command line argument and our output path as the second
Path in = new Path(args[0]);
Path out = new Path(args[1]);
// Create File Input/Output formats for these paths (in the job)
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
// Configure the job: name, mapper, reducer, and combiner
job.setJobName("LogAveragePerHour");
job.setMapperClass(LogMapClass.class);
job.setReducerClass(LogReduce.class);
job.setCombinerClass(LogReduce.class);
// Configure the output
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(DateWritable.class);
job.setOutputValueClass(IntWritable.class);
// Run the job
JobClient.runJob(job);
return 0;
}
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
}
else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null) job.set("status", status);
if (regex != null) job.set("regex", regex);
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}
/**
* Sets task classes with related info if needed into configuration object.
*
* @param jobConf Configuration to change.
* @param setMapper Option to set mapper and input format classes.
* @param setCombiner Option to set combiner class.
* @param setReducer Option to set reducer and output format classes.
*/
public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) {
if (setMapper) {
jobConf.setMapperClass(HadoopWordCount1Map.class);
jobConf.setInputFormat(TextInputFormat.class);
}
if (setCombiner)
jobConf.setCombinerClass(HadoopWordCount1Reduce.class);
if (setReducer) {
jobConf.setReducerClass(HadoopWordCount1Reduce.class);
jobConf.setOutputFormat(TextOutputFormat.class);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
/**
* @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
*/
public static void addDependencyJars(JobConf job) throws IOException {
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
job,
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
job.getPartitionerClass(),
job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
job.getCombinerClass());
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status, Integer retry) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
}
else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null) job.set("status", status);
if (regex != null) job.set("regex", regex);
if (retry != null) job.setInt("retry", retry);
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 4) {
printUsage();
}
Path userPath = new Path(args[0]);
Path commentPath = new Path(args[1]);
Path outputDir = new Path(args[2]);
String joinType = args[3];
JobConf conf = new JobConf("CompositeJoin");
conf.setJarByClass(CompositeUserJoin.class);
conf.setMapperClass(CompositeMapper.class);
conf.setNumReduceTasks(0);
// Set the input format class to a CompositeInputFormat class.
// The CompositeInputFormat will parse all of our input files and output
// records to our mapper.
conf.setInputFormat(CompositeInputFormat.class);
// The composite input format join expression will set how the records
// are going to be read in, and in what input format.
conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
KeyValueTextInputFormat.class, userPath, commentPath));
TextOutputFormat.setOutputPath(conf, outputDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
return job.isSuccessful() ? 0 : 1;
}
@Override
public Plan getPlan(String... args) {
// parse job parameters
int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String dataInput = (args.length > 1 ? args[1] : "");
String output = (args.length > 2 ? args[2] : "");
HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
new TextInputFormat(), new JobConf(), "Input Lines");
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
MapOperator mapper = MapOperator.builder(new TokenizeLine())
.input(source)
.name("Tokenize Lines")
.build();
ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
.input(mapper)
.name("Count Words")
.build();
HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
Plan plan = new Plan(out, "Hadoop OutputFormat Example");
plan.setDefaultParallelism(numSubTasks);
return plan;
}
/**
* This method constructs the JobConf to be used to run the map reduce job to
* download the files from S3. This is a potentially expensive method since it
* makes multiple calls to S3 to get a listing of all the input data. Clients
* are encouraged to cache the returned JobConf reference and not call this
* method multiple times unless necessary.
*
* @return the JobConf to be used to run the map reduce job to download the
* files from S3.
*/
public JobConf getJobConf() throws IOException, ParseException {
JobConf conf = new JobConf(CopyFromS3.class);
conf.setJobName("CopyFromS3");
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(S3CopyMapper.class);
// We configure a reducer, even though we don't use it right now.
// The idea is that, in the future we may.
conf.setReducerClass(HDFSWriterReducer.class);
conf.setNumReduceTasks(0);
FileInputFormat.setInputPaths(conf, new Path(tempFile));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setCompressMapOutput(true);
JobClient jobClient = new JobClient(conf);
FileSystem inputFS = FileSystem.get(URI.create(inputPathPrefix), conf);
DatePathFilter datePathFilter = new DatePathFilter(startDate, endDate);
List<Path> filePaths = getFilePaths(inputFS, new Path(inputPathPrefix), datePathFilter, jobClient.getDefaultMaps());
// Write the file names to a temporary index file to be used
// as input to the map tasks.
FileSystem outputFS = FileSystem.get(URI.create(tempFile), conf);
FSDataOutputStream outputStream = outputFS.create(new Path(tempFile), true);
try {
for (Path path : filePaths) {
outputStream.writeBytes(path.toString() + "\n");
}
}
finally {
outputStream.close();
}
conf.setNumMapTasks(Math.min(filePaths.size(), jobClient.getDefaultMaps()));
return conf;
}
@Override
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new TextOutputFormat<K, V>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
/**
* Based on args we submit the LoadGenerator as MR job.
* Number of MapTasks is numMapTasks
* @return exitCode for job submission
*/
private int submitAsMapReduce() {
System.out.println("Running as a MapReduce job with " +
numMapTasks + " mapTasks; Output to file " + mrOutDir);
Configuration conf = new Configuration(getConf());
// First set all the args of LoadGenerator as Conf vars to pass to MR tasks
conf.set(LG_ROOT , root.toString());
conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
conf.setInt(LG_NUMOFTHREADS, numOfThreads);
conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
conf.setLong(LG_SEED, seed); //No idea what this is
conf.setInt(LG_NUMMAPTASKS, numMapTasks);
if (scriptFile == null && durations[0] <=0) {
System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
System.exit(-1);
}
conf.setLong(LG_ELAPSEDTIME, durations[0]);
conf.setLong(LG_STARTTIME, startTime);
if (scriptFile != null) {
conf.set(LG_SCRIPTFILE , scriptFile);
}
conf.set(LG_FLAGFILE, flagFile.toString());
// Now set the necessary conf variables that apply to run MR itself.
JobConf jobConf = new JobConf(conf, LoadGenerator.class);
jobConf.setJobName("NNLoadGeneratorViaMR");
jobConf.setNumMapTasks(numMapTasks);
jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
jobConf.setInputFormat(DummyInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
// Explicitly set number of max map attempts to 1.
jobConf.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
jobConf.setSpeculativeExecution(false);
// This mapReduce job has no input but has output
FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
try {
JobClient.runJob(jobConf);
} catch (IOException e) {
System.err.println("Failed to run job: " + e.getMessage());
return -1;
}
return 0;
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir), true);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}
/**
* Based on args we submit the LoadGenerator as MR job.
* Number of MapTasks is numMapTasks
* @return exitCode for job submission
*/
private int submitAsMapReduce() {
System.out.println("Running as a MapReduce job with " +
numMapTasks + " mapTasks; Output to file " + mrOutDir);
Configuration conf = new Configuration(getConf());
// First set all the args of LoadGenerator as Conf vars to pass to MR tasks
conf.set(LG_ROOT , root.toString());
conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
conf.setInt(LG_NUMOFTHREADS, numOfThreads);
conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
conf.setLong(LG_SEED, seed); //No idea what this is
conf.setInt(LG_NUMMAPTASKS, numMapTasks);
if (scriptFile == null && durations[0] <=0) {
System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
System.exit(-1);
}
conf.setLong(LG_ELAPSEDTIME, durations[0]);
conf.setLong(LG_STARTTIME, startTime);
if (scriptFile != null) {
conf.set(LG_SCRIPTFILE , scriptFile);
}
conf.set(LG_FLAGFILE, flagFile.toString());
// Now set the necessary conf variables that apply to run MR itself.
JobConf jobConf = new JobConf(conf, LoadGenerator.class);
jobConf.setJobName("NNLoadGeneratorViaMR");
jobConf.setNumMapTasks(numMapTasks);
jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
jobConf.setInputFormat(DummyInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
// Explicitly set number of max map attempts to 1.
jobConf.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
jobConf.setSpeculativeExecution(false);
// This mapReduce job has no input but has output
FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
try {
JobClient.runJob(jobConf);
} catch (IOException e) {
System.err.println("Failed to run job: " + e.getMessage());
return -1;
}
return 0;
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
public static JobConf createDataJoinJob(String args[]) throws IOException {
String inputDir = args[0];
String outputDir = args[1];
Class inputFormat = SequenceFileInputFormat.class;
if (args[2].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileInputFormat: " + args[2]);
} else {
System.out.println("Using TextInputFormat: " + args[2]);
inputFormat = TextInputFormat.class;
}
int numOfReducers = Integer.parseInt(args[3]);
Class mapper = getClassByName(args[4]);
Class reducer = getClassByName(args[5]);
Class mapoutputValueClass = getClassByName(args[6]);
Class outputFormat = TextOutputFormat.class;
Class outputValueClass = Text.class;
if (args[7].compareToIgnoreCase("text") != 0) {
System.out.println("Using SequenceFileOutputFormat: " + args[7]);
outputFormat = SequenceFileOutputFormat.class;
outputValueClass = getClassByName(args[7]);
} else {
System.out.println("Using TextOutputFormat: " + args[7]);
}
long maxNumOfValuesPerGroup = 100;
String jobName = "";
if (args.length > 8) {
maxNumOfValuesPerGroup = Long.parseLong(args[8]);
}
if (args.length > 9) {
jobName = args[9];
}
Configuration defaults = new Configuration();
JobConf job = new JobConf(defaults, DataJoinJob.class);
job.setJobName("DataJoinJob: " + jobName);
FileSystem fs = FileSystem.get(defaults);
fs.delete(new Path(outputDir), true);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormat(inputFormat);
job.setMapperClass(mapper);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
job.setOutputFormat(outputFormat);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.BLOCK);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mapoutputValueClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(outputValueClass);
job.setReducerClass(reducer);
job.setNumMapTasks(1);
job.setNumReduceTasks(numOfReducers);
job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
return job;
}