下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setInputPaths ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
conf.setInt("mapred.map.max.attempts", 2);
Job theJob = new Job(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
@Test
public void requireThatMapReduceJobSucceeds() throws Exception {
Job job = Job.getInstance(conf);
job.setJarByClass(MapReduceTest.class);
job.setMapperClass(FeedMapper.class);
job.setOutputFormatClass(VespaOutputFormat.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(FeedReducer.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
assertTrue("Job Failed", success);
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
assertEquals(0, counters.getDocumentsFailed());
assertEquals(10, counters.getDocumentsOk());
}
/**
* Creates a simple copy job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a data copy job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createCopyJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
conf.setInt("mapred.map.tasks", 3);
Job theJob = new Job(conf);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(DataCopyMapper.class);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
theJob.setReducerClass(DataCopyReducer.class);
theJob.setNumReduceTasks(1);
return theJob;
}
@Test
public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint()
throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2003encrypt.xls";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
conf.set("hadoopoffice.read.security.crypt.password", "test2");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
InterruptedException ex = assertThrows(InterruptedException.class,
() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
Class valueClass) throws IOException {
List<String> inputFolders = Lists.newArrayList();
Path inputHDFSPath = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
boolean hasDir = false;
for (FileStatus stat : fileStatuses) {
if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
hasDir = true;
inputFolders.add(stat.getPath().toString());
}
}
if (!hasDir) {
return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
}
Job job = Job.getInstance();
FileInputFormat.setInputPaths(job, StringUtil.join(inputFolders, ","));
return env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat(), keyClass, valueClass, job));
}
@Test
public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint()
throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2013encrypt.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
conf.set("hadoopoffice.read.lowFootprint.parser", "sax");
// for decryption simply set the password
conf.set("hadoopoffice.read.security.crypt.password", "test2");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
InterruptedException ex = assertThrows(InterruptedException.class,
() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
public static Job createJob(Configuration conf, Path inDir, Path outDir,
int numInputFiles, int numReds, String input) throws IOException {
Job job = new Job(conf);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (fs.exists(inDir)) {
fs.delete(inDir, true);
}
fs.mkdirs(inDir);
for (int i = 0; i < numInputFiles; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setNumReduceTasks(numReds);
return job;
}
public static void runJob(Path inputPath,
Path smallFilePath,
Path outputPath)
throws Exception {
Configuration conf = new Configuration();
FileSystem fs = smallFilePath.getFileSystem(conf);
FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);
if (smallFilePathStatus.isDir()) {
for (FileStatus f : fs.listStatus(smallFilePath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(GenericReplicatedJoin.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setNumReduceTasks(0);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
/***
* This method call when injected into a class will modify the input path,
* only if input is into HDFS
*
* @param job
* Job whose input path need to be changed
*/
public static void modifyInputPath(Job job, String sampledDataPath) {
if(sampledDataPath==null){
throw new IllegalArgumentException("Sampled data path is null, expecting not null path value");
}
try {
LOGGER.debug("Modifying input path changed to: "+ sampledDataPath);
FileInputFormat.setInputPaths(job, new Path(sampledDataPath));
} catch (IOException e) {
LOGGER.error(JumbuneRuntimeException.throwUnresponsiveIOException(e.getStackTrace()));
}
}
public int run(String[] args) throws Exception {
Configuration mrConf = this.getConf();
for (java.util.Map.Entry<String, String> entry : dgaConfiguration.getSystemProperties().entrySet()) {
mrConf.set(entry.getKey(), entry.getValue());
}
Job job = Job.getInstance(mrConf);
job.setJarByClass(CommunityCompression.class);
Path in = new Path(inputPath);
Path out = new Path(outputPath);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("CommunityCompression");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LouvainVertexWritable.class);
job.setMapperClass(CommunityCompression.Map.class);
job.setReducerClass(CommunityCompression.Reduce.class);
logger.debug("Running Mapreduce step with job configuration: {}", job);
return job.waitForCompletion(false) ? 0 : 1;
}
/**
* Write the sequence file.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SequenceFileStockMapReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StockPriceWritable.class);
job.setInputFormatClass(
SequenceFileInputFormat.class); //<co id="ch03_comment_seqfile_mr1"/>
job.setOutputFormatClass(SequenceFileOutputFormat.class); //<co id="ch03_comment_seqfile_mr2"/>
SequenceFileOutputFormat.setCompressOutput(job, true); //<co id="ch03_comment_seqfile_mr3"/>
SequenceFileOutputFormat.setOutputCompressionType(job, //<co id="ch03_comment_seqfile_mr4"/>
SequenceFile.CompressionType.BLOCK);
SequenceFileOutputFormat.setOutputCompressorClass(job, //<co id="ch03_comment_seqfile_mr5"/>
DefaultCodec.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
@Override
public void setLocation(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
if (!UDFContext.getUDFContext().isFrontend()) {
typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
} else if (typeInfo == null) {
typeInfo = getTypeInfo(location, job);
}
if (typeInfo != null && oi == null) {
oi = OrcStruct.createObjectInspector(typeInfo);
}
if (!UDFContext.getUDFContext().isFrontend()) {
if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
.getProperty(signature + RequiredColumnsSuffix));
job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
getReqiredColumnIdString(mRequiredColumns));
if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
}
} else if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job)));
}
if (p.getProperty(signature + SearchArgsSuffix) != null) {
job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
}
}
FileInputFormat.setInputPaths(job, location);
}
public static boolean findShortestPath(Configuration conf, Path inputPath,
Path outputPath, String startNode,
String targetNode)
throws Exception {
conf.set(TARGET_NODE, targetNode);
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
Counter counter = job.getCounters()
.findCounter(Reduce.PathCounter.TARGET_NODE_DISTANCE_COMPUTED);
if (counter != null && counter.getValue() > 0) {
CounterGroup group = job.getCounters().getGroup(Reduce.PathCounter.PATH.toString());
Iterator<Counter> iter = group.iterator();
iter.hasNext();
String path = iter.next().getName();
System.out.println("==========================================");
System.out.println("= Shortest path found, details as follows.");
System.out.println("= ");
System.out.println("= Start node: " + startNode);
System.out.println("= End node: " + targetNode);
System.out.println("= Hops: " + counter.getValue());
System.out.println("= Path: " + path);
System.out.println("==========================================");
return true;
}
return false;
}
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
StringBuilder sb = new StringBuilder();
for (int j = 2; j < otherArgs.length; j++) {
sb.append(otherArgs[j]);
}
LOGGER.debug("Arguments[ " + otherArgs.length+"]"+"and values respectively ["+otherArgs[0]+"], "+
otherArgs[1]+", ["+otherArgs[2]+"]"+", ["+otherArgs[3]+"],"+
otherArgs[4]);
String inputpath = otherArgs[0];
String outputpath = "/tmp/jumbune/dvjsonreport"+ new Date().getTime();
String json = otherArgs[1];
String nullCondition = otherArgs[2];
String regex = otherArgs[3];
String dvDir = otherArgs[4];
if(regex.isEmpty()){
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, regex);
}
if(nullCondition.isEmpty()){
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, nullCondition);
}
conf.set(JsonDataVaildationConstants.SLAVE_DIR, dvDir);
conf.set(JsonDataVaildationConstants.JSON_ARGUMENT, json);
FileSystem fs = FileSystem.get(conf);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "JSONDataValidation");
job.setJarByClass(JsonDataValidationExecutor.class);
job.setInputFormatClass(JsonFileInputFormat.class);
job.setMapperClass(JsonDataValidationMapper.class);
job.setPartitionerClass(JsonDataValidationPartitioner.class);
job.setReducerClass(JsonDataValidationReducer.class);
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FileKeyViolationBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TotalReducerViolationBean.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Path[] inputPaths = FileUtil.getAllJsonNestedFilePath(job, inputpath);
FileInputFormat.setInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, new Path(outputpath));
if(fs.exists(new Path(outputpath)))
{
fs.delete(new Path(outputpath), true);
}
job.waitForCompletion(true);
Map<String, JsonViolationReport> jsonMap = readDataFromHdfs(conf,outputpath);
final Gson gson= new Gson();
final String jsonReport = gson.toJson(jsonMap);
LOGGER.info("Completed DataValidation");
LOGGER.info(JsonDataVaildationConstants.JSON_DV_REPORT + jsonReport);
}
/**
* Set input location and obtain input schema.
*/
@SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
if (inputAvroSchema != null) {
return;
}
if (!UDFContext.getUDFContext().isFrontend()) {
Properties udfProps = getUDFProperties();
String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
if (mergedSchema != null) {
HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
(HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
}
}
String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
if (schema != null) {
try {
inputAvroSchema = new Schema.Parser().parse(schema);
return;
} catch (Exception e) {
// Cases like testMultipleSchemas2 cause exception while deserializing
// symbols. In that case, we get it again.
LOG.warn("Exception while trying to deserialize schema in backend. " +
"Will construct again. schema= " + schema, e);
}
}
}
Configuration conf = job.getConfiguration();
Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
if (!paths.isEmpty()) {
// Set top level directories in input format. Adding all files will
// bloat configuration size
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
// Scan all directories including sub directories for schema
if (inputAvroSchema == null) {
setInputAvroSchema(paths, conf);
}
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
}
@Override
public void setLocation(String location, Job job) throws IOException {
loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}
private Configuration getConf(Configuration jconf) throws IOException {
Job job = Job.getInstance(jconf);
FileInputFormat.setInputPaths(job, indir);
return job.getConfiguration();
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://node-01:9000");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
String commaSeparatedPaths = null;
String outputDir = null;
if (otherArgs.length == 2) {
commaSeparatedPaths = otherArgs[0];
outputDir = otherArgs[1];
} else {
System.err.println("Usage: <in>[,<in>...] <out>");
System.exit(-1);
}
LOGGER.info("==========job start");
Job job = Job.getInstance(conf);
job.setJobName("WordCountJob");
job.setJarByClass(WordCountJob.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, commaSeparatedPaths);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
if (job.waitForCompletion(true)) {
LOGGER.info("==========job success");
} else {
LOGGER.info("==========job failed");
}
}
@Test
public void readExcelInputFormatExcel2013SingleSheetLowFootPrintStaxPartlyInMemoryCompressed() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2013test.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
// stax parser
conf.set("hadoopoffice.read.lowFootprint.parser", "stax");
// partly in memory compressed
conf.set("hadoopoffice.read.lowFootprint.stax.sst.cache", "1");
conf.set("hadoopoffice.read.lowFootprint.stax.sst.compress", "true");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull(reader, "Format returned null RecordReader");
reader.initialize(splits.get(0), context);
Text spreadSheetKey = new Text();
ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 1");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals("[excel2013test.xlsx]Sheet1!A1", spreadSheetKey.toString(),
"Input Split for Excel file has keyname == \"[excel2013test.xlsx]Sheet1!A1\"");
assertEquals(4, spreadSheetValue.get().length, "Input Split for Excel file contains row 1 with 4 columns");
assertEquals("test1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 1 with cell 1 == \"test1\"");
assertEquals("Sheet1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getSheetName(),
"Input Split for Excel file contains row 1 with cell 1 sheetname == \"Sheet1\"");
assertEquals("A1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getAddress(),
"Input Split for Excel file contains row 1 with cell 1 address == \"A1\"");
assertEquals("test2", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
"Input Split for Excel file contains row 1 with cell 2 == \"test2\"");
assertEquals("test3", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
"Input Split for Excel file contains row 1 with cell 3 == \"test3\"");
assertEquals("test4", ((SpreadSheetCellDAO) spreadSheetValue.get()[3]).getFormattedValue(),
"Input Split for Excel file contains row 1 with cell 4 == \"test4\"");
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 2");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals(1, spreadSheetValue.get().length, "Input Split for Excel file contains row 2 with 1 column");
assertEquals("4", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 2 with cell 1 == \"4\"");
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 3");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals(5, spreadSheetValue.get().length, "Input Split for Excel file contains row 3 with 5 columns");
assertEquals("31/12/99", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 3 with cell 1 == \"31/12/99\"");
assertEquals("5", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
"Input Split for Excel file contains row 3 with cell 2 == \"5\"");
assertNull(spreadSheetValue.get()[2], "Input Split for Excel file contains row 3 with cell 3 == null");
assertNull(spreadSheetValue.get()[3], "Input Split for Excel file contains row 3 with cell 4 == null");
assertEquals("null", ((SpreadSheetCellDAO) spreadSheetValue.get()[4]).getFormattedValue(),
"Input Split for Excel file contains row 3 with cell 5 == \"null\"");
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 4");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals(1, spreadSheetValue.get().length, "Input Split for Excel file contains row 4 with 1 column");
assertEquals("1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 4 with cell 1 == \"1\"");
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 5");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals(3, spreadSheetValue.get().length, "Input Split for Excel file contains row 5 with 3 columns");
assertEquals("2", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 5 with cell 1 == \"2\"");
assertEquals("6", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
"Input Split for Excel file contains row 5 with cell 2== \"6\"");
assertEquals("10", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
"Input Split for Excel file contains row 5 with cell 3== \"10\"");
assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 6");
spreadSheetKey = reader.getCurrentKey();
spreadSheetValue = reader.getCurrentValue();
assertEquals(3, spreadSheetValue.get().length, "Input Split for Excel file contains row 6 with 3 columns");
assertEquals("3", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
"Input Split for Excel file contains row 6 with cell 1 == \"3\"");
assertEquals("4", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
"Input Split for Excel file contains row 6 with cell 2== \"4\"");
assertEquals("15", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
"Input Split for Excel file contains row 6 with cell 3== \"15\"");
}
public int run(String[] args) throws Exception {
/*Configuration conf = getConf();
JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String join_reduces = conf.get(REDUCES_PER_HOST);
if (join_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(join_reduces);
}
// Set user-supplied (possibly default) job configs
job.setNumReduceTasks(num_reduces);*/
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://node-01:9000");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
String commaSeparatedPaths = null;
String outputDir = null;
if (otherArgs.length == 2) {
commaSeparatedPaths = otherArgs[0];
outputDir = otherArgs[1];
} else {
System.err.println("Usage: <in>[,<in>...] <out>");
//System.exit(-1);
return -1;
}
Job job = Job.getInstance(conf);
job.setJobName("StepTwoJob");
job.setJarByClass(StepTwoJob.class);
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(StepTwoMapper.class);
// job.setCombinerClass(StepOneReducer.class);
job.setReducerClass(StepTwoReducer.class);
// job.setPartitionerClass(FlowPartition.class);
// job.setNumReduceTasks(5);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, commaSeparatedPaths);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
return job.waitForCompletion(true) ? 0 : 1;
}