下面列出了org.apache.hadoop.io.serializer.JavaSerializationComparator#org.apache.hadoop.mapreduce.lib.input.FileInputFormat 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: CommentWordCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment Word Count");
job.setJarByClass(CommentWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
job.setMapperClass(GridmixMapper.class);
job.setReducerClass(GridmixReducer.class);
job.setNumReduceTasks(jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(GridmixRecord.class);
job.setSortComparatorClass(GridmixKey.Comparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(GridmixInputFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class);
job.setPartitionerClass(DraftPartitioner.class);
job.setJarByClass(GridmixJob.class);
job.getConfiguration().setInt("gridmix.job.seq", seq);
job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
? "<unknown>" : jobdesc.getJobID().toString());
job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
FileInputFormat.addInputPath(job, new Path("ignored"));
FileOutputFormat.setOutputPath(job, outdir);
job.submit();
return job;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = Job.getInstance();
job.setJarByClass(MaxTemperature.class);
job.setJobName("MapReduce实验-气象数据集-求气温最大值");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(TemperatureMapper.class);
// 设置 Combiner 减少数据的传输量、提高效率
// job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Average <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverageMapper.class);
job.setCombinerClass(AverageReducer.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(CountAverageTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
Configuration conf=new Configuration();
Job job = new Job(conf);
job.setJarByClass(DT_ID3_Driver.class);
job.setJobName("Decision_Tree_Algorithm_on_Hadoop");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//job.setNumReduceTasks(0);
job.setMapperClass(DT_ID3_Map.class);
job.setReducerClass(DT_ID3_Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
public static Schema getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
Path[] sourceDirs = FileInputFormat.getInputPaths(job);
List<FileStatus> files = new ArrayList<FileStatus>();
for (Path sourceDir : sourceDirs) {
files.addAll(Arrays.asList(fs.listStatus(sourceDir)));
}
Collections.sort(files, new LastModifiedDescComparator());
for (FileStatus file : files) {
Schema schema = getNewestSchemaFromSource(file.getPath(), fs);
if (schema != null) {
return schema;
}
}
return null;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
long t1 = System.currentTimeMillis();
boolean re = job.waitForCompletion(true);
long t2 = System.currentTimeMillis();
System.out.println((float)(t2-t1)/1000);
if (re)
System.exit(0);
else
System.exit(1);
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// System.out.println(otherArgs);
if(otherArgs.length != 2) {
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}
// if(args.length != 2) {
// System.out.println("param error!");
// System.exit(-1);
// }
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void requireThatMapOnlyJobSucceeds() throws Exception {
Job job = Job.getInstance(conf);
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(FeedMapper.class);
job.setOutputFormatClass(VespaOutputFormat.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
assertTrue("Job Failed", success);
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
assertEquals(0, counters.getDocumentsFailed());
assertEquals(10, counters.getDocumentsOk());
}
public static boolean runCalcJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(CalcMapReduce.Map.class);
job.setReducerClass(CalcMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(CalcMapReduce.TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true);
}
@Override
public Extractor<S, D> getExtractor(WorkUnitState workUnitState) throws IOException {
if (!workUnitState.contains(FILE_SPLIT_BYTES_STRING_KEY)) {
throw new IOException("No serialized FileSplit found in WorkUnitState " + workUnitState.getId());
}
Configuration configuration = new Configuration();
FileInputFormat<K, V> fileInputFormat = getFileInputFormat(workUnitState, configuration);
String fileSplitBytesStr = workUnitState.getProp(FILE_SPLIT_BYTES_STRING_KEY);
FileSplit fileSplit = (FileSplit) HadoopUtils.deserializeFromString(FileSplit.class, fileSplitBytesStr);
TaskAttemptContext taskAttemptContext =
getTaskAttemptContext(configuration, DummyTaskAttemptIDFactory.newTaskAttemptID());
try {
RecordReader<K, V> recordReader = fileInputFormat.createRecordReader(fileSplit, taskAttemptContext);
recordReader.initialize(fileSplit, taskAttemptContext);
boolean readKeys = workUnitState.getPropAsBoolean(FILE_INPUT_READ_KEYS_KEY, DEFAULT_FILE_INPUT_READ_KEYS);
return getExtractor(workUnitState, recordReader, fileSplit, readKeys);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "step1");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(Step1.class);
job1.setMapperClass(WikiMapper1.class);
job1.setMapOutputKeyClass(VarLongWritable.class);
job1.setMapOutputValueClass(LongAndFloat.class);
job1.setReducerClass(WiKiReducer1.class);
job1.setOutputKeyClass(VarLongWritable.class);
job1.setOutputValueClass(VectorWritable.class);
FileInputFormat.addInputPath(job1, new Path( INPUT_PATH ) );
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH ));
if (!job1.waitForCompletion(true)) {
System.exit(1);
}
}
@Test
public void testListStatusNestedRecursive() throws IOException {
Configuration conf = new Configuration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
List<Path> expectedPaths = org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.configureTestNestedRecursive(conf, localFs);
JobConf jobConf = new JobConf(conf);
TextInputFormat fif = new TextInputFormat();
fif.configure(jobConf);
FileStatus[] statuses = fif.listStatus(jobConf);
org.apache.hadoop.mapreduce.lib.input.TestFileInputFormat
.verifyFileStatuses(expectedPaths, Lists.newArrayList(statuses),
localFs);
}
@Override
public void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol)
throws ClassNotFoundException, IOException {
// Write a line of text into a file so that we can get
// a record to the map task.
Path dir = new Path(this.options.getTempDir());
Path p = new Path(dir, "sqoop-dummy-import-job-file.txt");
FileSystem fs = FileSystem.getLocal(this.options.getConf());
if (fs.exists(p)) {
boolean result = fs.delete(p, false);
assertTrue("Couldn't delete temp file!", result);
}
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(fs.create(p)));
w.append("This is a line!");
w.close();
FileInputFormat.addInputPath(job, p);
// And set the InputFormat itself.
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapreduceInputFormat instanceof FileInputFormat)) {
return null;
}
JobContext jobContext = new JobContextImpl(configuration, null);
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
@Override
public void setInputPaths( org.pentaho.hadoop.shim.api.internal.fs.Path... paths ) {
if ( paths == null ) {
return;
}
Path[] actualPaths = new Path[ paths.length ];
for ( int i = 0; i < paths.length; i++ ) {
actualPaths[ i ] = ShimUtils.asPath( paths[ i ] );
}
try {
FileInputFormat.setInputPaths( getJob(), actualPaths );
} catch ( IOException e ) {
e.printStackTrace();
}
}
/**
* Create a {@code HDFSFileReader} based on a single Hadoop input split.
*/
public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) throws IOException {
this.source = source;
this.filepattern = filepattern;
this.formatClass = formatClass;
if (split != null) {
this.splits = ImmutableList.of(split);
this.splitsIterator = splits.listIterator();
}
this.job = ((ConfigurableHDFSFileSource<K, V>) getCurrentSource()).jobInstance(); // new instance
}
public static ChangedFile[] getSnapshot(String dataPath, String repoName, int index) throws Exception {
TestBuildSnapshotFromSequenceFile.repoName = repoName;
TestBuildSnapshotFromSequenceFile.index = index;
TestBuildSnapshotFromSequenceFile.snapshot = null;
File outDir = new File("dataset/temp_output");
if (outDir.exists())
new FileIO.DirectoryRemover(outDir.getAbsolutePath()).run();
Configuration conf = new Configuration();
Job job = new Job(conf, "read sequence file");
job.setJarByClass(TestBuildSnapshotFromSequenceFile.class);
job.setMapperClass(SequenceFileReaderMapper.class);
job.setCombinerClass(SequenceFileReaderReducer.class);
job.setReducerClass(SequenceFileReaderReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(dataPath, "projects.seq"));
FileOutputFormat.setOutputPath(job, new Path(outDir.getAbsolutePath()));
boolean completed = job.waitForCompletion(false);
assertEquals(completed, true);
if (outDir.exists())
new FileIO.DirectoryRemover(outDir.getAbsolutePath()).run();
return snapshot;
}
private void setInput(Job job, String[] dicColsArray, String inputBase) throws IOException {
StringBuffer paths = new StringBuffer();
// make each reducer output to respective dir
for (String col : dicColsArray) {
paths.append(inputBase).append("/dict_column=").append(col).append(",");
}
paths.delete(paths.length() - 1, paths.length());
FileInputFormat.setInputPaths(job, paths.toString());
}
@Test
public void testBAMWithSplittingBai() throws Exception {
int numPairs = 20000;
// create a large BAM with lots of index points
String bam = BAMTestUtil.writeBamFile(20000,
SAMFileHeader.SortOrder.coordinate).toURI().toString();
conf.setInt(FileInputFormat.SPLIT_MAXSIZE, 800000); // force multiple parts
conf.setBoolean(BAMOutputFormat.WRITE_SPLITTING_BAI, true);
final Path outputPath = doMapReduce(bam);
List<SAMRecord> recordsAtSplits = new ArrayList<>();
File[] splittingIndexes = new File(outputPath.toUri()).listFiles(pathname -> {
return pathname.getName().endsWith(SplittingBAMIndexer.OUTPUT_FILE_EXTENSION);
});
Arrays.sort(splittingIndexes); // ensure files are sorted by name
for (File file : splittingIndexes) {
File bamFile = new File(file.getParentFile(),
file.getName().replace(SplittingBAMIndexer.OUTPUT_FILE_EXTENSION, ""));
SplittingBAMIndex index = new SplittingBAMIndex(file);
recordsAtSplits.addAll(getRecordsAtSplits(bamFile, index));
}
final File outFile = File.createTempFile("testBAMWriter", ".bam");
//outFile.deleteOnExit();
SAMFileMerger.mergeParts(outputPath.toUri().toString(), outFile.toURI().toString(),
SAMFormat.BAM,
new SAMRecordSetBuilder(true, SAMFileHeader.SortOrder.coordinate).getHeader());
final int actualCount = getBAMRecordCount(outFile);
assertEquals(numPairs * 2 + 2, actualCount); // 2 unmapped reads
File splittingBai = new File(outFile.getParentFile(), outFile.getName() +
SplittingBAMIndexer.OUTPUT_FILE_EXTENSION);
SplittingBAMIndex splittingBAMIndex = new SplittingBAMIndex(splittingBai);
assertEquals(recordsAtSplits, getRecordsAtSplits(outFile, splittingBAMIndex));
}
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
// check if compression emulation is enabled
if (CompressionEmulationUtil
.isCompressionEmulationEnabled(job.getConfiguration())) {
CompressionEmulationUtil.configure(job);
} else {
configureRandomBytesDataGenerator();
}
job.submit();
return job;
}
private void configureRandomBytesDataGenerator() {
job.setMapperClass(GenDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(GenDataFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class);
job.setJarByClass(GenerateData.class);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
}
});
return job;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordstddev <in> <out>");
return 0;
}
Configuration conf = getConf();
Job job = Job.getInstance(conf, "word stddev");
job.setJarByClass(WordStandardDeviation.class);
job.setMapperClass(WordStandardDeviationMapper.class);
job.setCombinerClass(WordStandardDeviationReducer.class);
job.setReducerClass(WordStandardDeviationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
Boolean waitforCompletion = job.waitForCompletion(true) ;
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
// read output and calculate standard deviation
stddev = readAndCalcStdDev(outputpath, conf);
return (waitforCompletion ? 0 : 1);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step2DistinctDataJob.class);
job.setJobName(Phase3Step2DistinctDataJob.class.getName());
//mapper
job.setMapperClass(RemoveRedundantDataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//reducer
job.setReducerClass(RemoveRedundantDataReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
job.setInputFormatClass(TextInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
//i/o paths
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String [] args) throws Exception {
String in = args[0];
String out = args[1];
System.err.println("Using input=" + in);
System.err.println("Using output=" + out);
String patterns[] = new String[args.length - 2];
System.arraycopy(args, 2, patterns, 0, args.length - 2);
System.err.println("Using patterns: " + Arrays.toString(patterns));
for (int i = 0; i < patterns.length; i++) {
String pattern = patterns[i];
Configuration conf = new Configuration();
conf.set("pattern", pattern);
Job job = Job.getInstance(conf, "Grep for " + pattern);
job.setMapperClass(Grep.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
job.setJarByClass(Grep.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "_" + pattern));
if (!job.waitForCompletion(true)) {
throw new RuntimeException("Grep job " + i + " failed");
}
}
}
@Override
public void configureJob(Job job) {
job.setInputFormatClass(SequenceFileInputFormat.class);
String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
JobBuilderSupport.getJobWorkingDir(conf, jobId));
try {
FileInputFormat.addInputPath(job, new Path(inputPath));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Before
public void setUp() {
TsFileTestHelper.writeTsFile(filePath);
inputFormat = new TSFHiveInputFormat();
//in windows
String jobPath = filePath.replaceAll("\\\\","/");
job = new JobConf();
job.set(FileInputFormat.INPUT_DIR, jobPath);
Path path = new Path(jobPath);
String[] hosts = {"127.0.0.1"};
inputSplit = new TSFInputSplit(path, hosts, 0, 3727688L);
}
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
job.setMapperClass(GenDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(GenDataFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class);
job.setJarByClass(GenerateData.class);
FileInputFormat.addInputPath(job, new Path("ignored"));
job.submit();
return job;
}
public static void main(String[] args) throws Exception {
Configuration con = new Configuration();
Job bookJob = Job.getInstance(con, "Average Page Count");
bookJob.setJarByClass(AveragePageCount.class);
bookJob.setMapperClass(TextMapper.class);
bookJob.setReducerClass(AverageReduce.class);
bookJob.setOutputKeyClass(Text.class);
bookJob.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(bookJob, new Path("C:/Hadoop/books.txt"));
FileOutputFormat.setOutputPath(bookJob, new Path("C:/Hadoop/BookOutput"));
if (bookJob.waitForCompletion(true)) {
System.exit(0);
}
}