org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setInputPaths ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setInputPaths ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RDFS   文件: MapReduceTestUtil.java
/**
 * Creates a simple fail job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a simple fail job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createFailJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {

  conf.setInt("mapred.map.max.attempts", 2);
  Job theJob = new Job(conf);
  theJob.setJobName("Fail-Job");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(FailMapper.class);
  theJob.setReducerClass(Reducer.class);
  theJob.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  return theJob;
}
 
源代码2 项目: vespa   文件: MapReduceTest.java
@Test
public void requireThatMapReduceJobSucceeds() throws Exception {
    Job job = Job.getInstance(conf);
    job.setJarByClass(MapReduceTest.class);
    job.setMapperClass(FeedMapper.class);
    job.setOutputFormatClass(VespaOutputFormat.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(FeedReducer.class);
    job.setNumReduceTasks(1);

    FileInputFormat.setInputPaths(job, metricsJsonPath);

    boolean success = job.waitForCompletion(true);
    assertTrue("Job Failed", success);

    VespaCounters counters = VespaCounters.get(job);
    assertEquals(10, counters.getDocumentsSent());
    assertEquals(0, counters.getDocumentsFailed());
    assertEquals(10, counters.getDocumentsOk());
}
 
源代码3 项目: RDFS   文件: MapReduceTestUtil.java
/**
 * Creates a simple copy job.
 * 
 * @param conf Configuration object
 * @param outdir Output directory.
 * @param indirs Comma separated input directories.
 * @return Job initialized for a data copy job.
 * @throws Exception If an error occurs creating job configuration.
 */
public static Job createCopyJob(Configuration conf, Path outdir, 
    Path... indirs) throws Exception {
  conf.setInt("mapred.map.tasks", 3);
  Job theJob = new Job(conf);
  theJob.setJobName("DataMoveJob");

  FileInputFormat.setInputPaths(theJob, indirs);
  theJob.setMapperClass(DataCopyMapper.class);
  FileOutputFormat.setOutputPath(theJob, outdir);
  theJob.setOutputKeyClass(Text.class);
  theJob.setOutputValueClass(Text.class);
  theJob.setReducerClass(DataCopyReducer.class);
  theJob.setNumReduceTasks(1);
  return theJob;
}
 
@Test
public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint()
		throws IOException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName = "excel2003encrypt.xls";
	String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
	Path file = new Path(fileNameSpreadSheet);
	// set locale to the one of the test data
	conf.set("hadoopoffice.read.locale.bcp47", "de");

	// low footprint
	conf.set("hadoopoffice.read.lowFootprint", "true");
	// for decryption simply set the password
	conf.set("hadoopoffice.read.security.crypt.password", "test2");
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	ExcelFileInputFormat format = new ExcelFileInputFormat();
	List<InputSplit> splits = format.getSplits(job);
	assertEquals(1, splits.size(), "Only one split generated for Excel file");
	RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);

	InterruptedException ex = assertThrows(InterruptedException.class,
			() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
 
源代码5 项目: kylin   文件: FlinkUtil.java
public static DataSet parseInputPath(String inputPath, FileSystem fs, ExecutionEnvironment env, Class keyClass,
        Class valueClass) throws IOException {
    List<String> inputFolders = Lists.newArrayList();
    Path inputHDFSPath = new Path(inputPath);
    FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
    boolean hasDir = false;
    for (FileStatus stat : fileStatuses) {
        if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
            hasDir = true;
            inputFolders.add(stat.getPath().toString());
        }
    }

    if (!hasDir) {
        return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, inputHDFSPath.toString()));
    }

    Job job = Job.getInstance();
    FileInputFormat.setInputPaths(job, StringUtil.join(inputFolders, ","));
    return env.createInput(HadoopInputs.createHadoopInput(new SequenceFileInputFormat(), keyClass, valueClass, job));
}
 
@Test
public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint()
		throws IOException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName = "excel2013encrypt.xlsx";
	String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
	Path file = new Path(fileNameSpreadSheet);
	// set locale to the one of the test data
	conf.set("hadoopoffice.read.locale.bcp47", "de");

	// low footprint
	conf.set("hadoopoffice.read.lowFootprint", "true");
	conf.set("hadoopoffice.read.lowFootprint.parser", "sax");
	// for decryption simply set the password
	conf.set("hadoopoffice.read.security.crypt.password", "test2");
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	ExcelFileInputFormat format = new ExcelFileInputFormat();
	List<InputSplit> splits = format.getSplits(job);
	assertEquals(1, splits.size(), "Only one split generated for Excel file");
	RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
	InterruptedException ex = assertThrows(InterruptedException.class,
			() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
 
源代码7 项目: RDFS   文件: MapReduceTestUtil.java
public static Job createJob(Configuration conf, Path inDir, Path outDir, 
    int numInputFiles, int numReds, String input) throws IOException {
  Job job = new Job(conf);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (fs.exists(inDir)) {
    fs.delete(inDir, true);
  }
  fs.mkdirs(inDir);
  for (int i = 0; i < numInputFiles; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }    

  FileInputFormat.setInputPaths(job, inDir);
  FileOutputFormat.setOutputPath(job, outDir);
  job.setNumReduceTasks(numReds);
  return job;
}
 
源代码8 项目: hiped2   文件: Main.java
public static void runJob(Path inputPath,
                          Path smallFilePath,
                          Path outputPath)
    throws Exception {

  Configuration conf = new Configuration();

  FileSystem fs = smallFilePath.getFileSystem(conf);

  FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);

  if (smallFilePathStatus.isDir()) {
    for (FileStatus f : fs.listStatus(smallFilePath)) {
      if (f.getPath().getName().startsWith("part")) {
        DistributedCache.addCacheFile(f.getPath().toUri(), conf);
      }
    }
  } else {
    DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
  }

  Job job = new Job(conf);

  job.setJarByClass(Main.class);
  job.setMapperClass(GenericReplicatedJoin.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setNumReduceTasks(0);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  job.waitForCompletion(true);
}
 
源代码9 项目: jumbune   文件: JobUtil.java
/***
 * This method call when injected into a class will modify the input path,
 * only if input is into HDFS
 * 
 * @param job
 *            Job whose input path need to be changed
 */
public static void modifyInputPath(Job job, String sampledDataPath) {
	if(sampledDataPath==null){
		throw new IllegalArgumentException("Sampled data path is null, expecting not null path value");
	}
		try {
			LOGGER.debug("Modifying input path changed to: "+ sampledDataPath);
			FileInputFormat.setInputPaths(job, new Path(sampledDataPath));
		} catch (IOException e) {
			LOGGER.error(JumbuneRuntimeException.throwUnresponsiveIOException(e.getStackTrace()));
		}
	}
 
public int run(String[] args) throws Exception {
    Configuration mrConf = this.getConf();
    for (java.util.Map.Entry<String, String> entry : dgaConfiguration.getSystemProperties().entrySet()) {
        mrConf.set(entry.getKey(), entry.getValue());
    }

    Job job = Job.getInstance(mrConf);
    job.setJarByClass(CommunityCompression.class);
    Path in = new Path(inputPath);
    Path out = new Path(outputPath);

    FileInputFormat.setInputPaths(job, in);
    FileOutputFormat.setOutputPath(job, out);
    job.setJobName("CommunityCompression");

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LouvainVertexWritable.class);

    job.setMapperClass(CommunityCompression.Map.class);
    job.setReducerClass(CommunityCompression.Reduce.class);

    logger.debug("Running Mapreduce step with job configuration: {}", job);

    return job.waitForCompletion(false) ? 0 : 1;
}
 
源代码11 项目: hiped2   文件: SequenceFileStockMapReduce.java
/**
 * Write the sequence file.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
  Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(SequenceFileStockMapReduce.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(StockPriceWritable.class);
  job.setInputFormatClass(
      SequenceFileInputFormat.class); //<co id="ch03_comment_seqfile_mr1"/>
  job.setOutputFormatClass(SequenceFileOutputFormat.class);  //<co id="ch03_comment_seqfile_mr2"/>
  SequenceFileOutputFormat.setCompressOutput(job, true);  //<co id="ch03_comment_seqfile_mr3"/>
  SequenceFileOutputFormat.setOutputCompressionType(job,  //<co id="ch03_comment_seqfile_mr4"/>
      SequenceFile.CompressionType.BLOCK);
  SequenceFileOutputFormat.setOutputCompressorClass(job,  //<co id="ch03_comment_seqfile_mr5"/>
      DefaultCodec.class);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
源代码12 项目: spork   文件: OrcStorage.java
@Override
public void setLocation(String location, Job job) throws IOException {
    Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
    if (!UDFContext.getUDFContext().isFrontend()) {
        typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
    } else if (typeInfo == null) {
        typeInfo = getTypeInfo(location, job);
    }
    if (typeInfo != null && oi == null) {
        oi = OrcStruct.createObjectInspector(typeInfo);
    }
    if (!UDFContext.getUDFContext().isFrontend()) {
        if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
            mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
                    .getProperty(signature + RequiredColumnsSuffix));
            job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
            job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
                    getReqiredColumnIdString(mRequiredColumns));
            if (p.getProperty(signature + SearchArgsSuffix) != null) {
                // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
                job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
                        getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
            }
        } else if (p.getProperty(signature + SearchArgsSuffix) != null) {
            // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
            job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
                    getReqiredColumnNamesString(getSchema(location, job)));
        }
        if (p.getProperty(signature + SearchArgsSuffix) != null) {
            job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
        }

    }
    FileInputFormat.setInputPaths(job, location);
}
 
源代码13 项目: hiped2   文件: Main.java
public static boolean findShortestPath(Configuration conf, Path inputPath,
                                       Path outputPath, String startNode,
                                       String targetNode)
    throws Exception {
  conf.set(TARGET_NODE, targetNode);

  Job job = new Job(conf);
  job.setJarByClass(Main.class);
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (!job.waitForCompletion(true)) {
    throw new Exception("Job failed");
  }

  Counter counter = job.getCounters()
      .findCounter(Reduce.PathCounter.TARGET_NODE_DISTANCE_COMPUTED);

  if (counter != null && counter.getValue() > 0) {
    CounterGroup group = job.getCounters().getGroup(Reduce.PathCounter.PATH.toString());
    Iterator<Counter> iter = group.iterator();
    iter.hasNext();
    String path = iter.next().getName();
    System.out.println("==========================================");
    System.out.println("= Shortest path found, details as follows.");
    System.out.println("= ");
    System.out.println("= Start node:  " + startNode);
    System.out.println("= End node:    " + targetNode);
    System.out.println("= Hops:        " + counter.getValue());
    System.out.println("= Path:        " + path);
    System.out.println("==========================================");
    return true;
  }
  return false;
}
 
源代码14 项目: jumbune   文件: JsonDataValidationExecutor.java
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
  {
  	Configuration conf = new Configuration();	
  	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
StringBuilder sb = new StringBuilder();
  	for (int j = 2; j < otherArgs.length; j++) {
	
  		sb.append(otherArgs[j]);
}
  	
  	LOGGER.debug("Arguments[ " + otherArgs.length+"]"+"and values respectively ["+otherArgs[0]+"], "+
		otherArgs[1]+", ["+otherArgs[2]+"]"+", ["+otherArgs[3]+"],"+
		otherArgs[4]);

String inputpath = otherArgs[0];
String outputpath = "/tmp/jumbune/dvjsonreport"+  new Date().getTime();

String json = otherArgs[1];
String nullCondition = otherArgs[2];
String regex = otherArgs[3];
String dvDir = otherArgs[4];



if(regex.isEmpty()){
	conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, "");
}else{
	conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, regex);
}

if(nullCondition.isEmpty()){
	conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, "");
}else{
	conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, nullCondition);
}


conf.set(JsonDataVaildationConstants.SLAVE_DIR, dvDir);
conf.set(JsonDataVaildationConstants.JSON_ARGUMENT, json);
FileSystem fs = FileSystem.get(conf);

@SuppressWarnings("deprecation")
Job job = new Job(conf, "JSONDataValidation");
job.setJarByClass(JsonDataValidationExecutor.class);

job.setInputFormatClass(JsonFileInputFormat.class);

job.setMapperClass(JsonDataValidationMapper.class);
job.setPartitionerClass(JsonDataValidationPartitioner.class);
job.setReducerClass(JsonDataValidationReducer.class);
job.setNumReduceTasks(5);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FileKeyViolationBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TotalReducerViolationBean.class);
	
job.setOutputFormatClass(SequenceFileOutputFormat.class);

  	Path[] inputPaths = FileUtil.getAllJsonNestedFilePath(job, inputpath);

FileInputFormat.setInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, new Path(outputpath));
		
if(fs.exists(new Path(outputpath)))
{
	fs.delete(new Path(outputpath), true);
}

job.waitForCompletion(true);	

 Map<String, JsonViolationReport> jsonMap = readDataFromHdfs(conf,outputpath);
 final Gson gson= new Gson();
 final String jsonReport = gson.toJson(jsonMap);

 LOGGER.info("Completed DataValidation");
 LOGGER.info(JsonDataVaildationConstants.JSON_DV_REPORT + jsonReport);
  }
 
源代码15 项目: spork   文件: AvroStorage.java
/**
 * Set input location and obtain input schema.
 */
@SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
    if (inputAvroSchema != null) {
        return;
    }

    if (!UDFContext.getUDFContext().isFrontend()) {
        Properties udfProps = getUDFProperties();
        String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
        if (mergedSchema != null) {
            HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
                    (HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
            schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
            for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
                schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
            }
        }
        String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
        if (schema != null) {
            try {
                inputAvroSchema = new Schema.Parser().parse(schema);
                return;
            } catch (Exception e) {
                // Cases like testMultipleSchemas2 cause exception while deserializing
                // symbols. In that case, we get it again.
                LOG.warn("Exception while trying to deserialize schema in backend. " +
                        "Will construct again. schema= " + schema, e);
            }
        }
    }

    Configuration conf = job.getConfiguration();
    Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
    if (!paths.isEmpty()) {
        // Set top level directories in input format. Adding all files will
        // bloat configuration size
        FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
        // Scan all directories including sub directories for schema
        if (inputAvroSchema == null) {
            setInputAvroSchema(paths, conf);
        }
    } else {
        throw new IOException("Input path \'" + location + "\' is not found");
    }

}
 
源代码16 项目: spork   文件: CSVLoader.java
@Override
public void setLocation(String location, Job job) throws IOException {
    loadLocation = location;
    FileInputFormat.setInputPaths(job, location);        
}
 
源代码17 项目: big-c   文件: Parser.java
private Configuration getConf(Configuration jconf) throws IOException {
  Job job = Job.getInstance(jconf);
  FileInputFormat.setInputPaths(job, indir);
  return job.getConfiguration();
}
 
源代码18 项目: xxhadoop   文件: WordCountJob.java
public static void main(String[] args) throws IOException,
		ClassNotFoundException, InterruptedException {
	
	Configuration conf = new Configuration();
	//conf.set("fs.defaultFS", "hdfs://node-01:9000");
	String[] otherArgs = new GenericOptionsParser(conf, args)
			.getRemainingArgs();

	String commaSeparatedPaths = null;
	String outputDir = null;
	if (otherArgs.length == 2) {
		commaSeparatedPaths = otherArgs[0];
		outputDir = otherArgs[1];
	} else {
		System.err.println("Usage: <in>[,<in>...] <out>");
		System.exit(-1);
	}

	LOGGER.info("==========job start");
	Job job = Job.getInstance(conf);
	job.setJobName("WordCountJob");
	job.setJarByClass(WordCountJob.class);
	
	job.setMapperClass(WordCountMapper.class);
	job.setCombinerClass(WordCountReducer.class);
	job.setReducerClass(WordCountReducer.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(LongWritable.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(LongWritable.class);

	FileInputFormat.setInputPaths(job, commaSeparatedPaths);
	FileOutputFormat.setOutputPath(job, new Path(outputDir));

	if (job.waitForCompletion(true)) {
		LOGGER.info("==========job success");
	} else {
		LOGGER.info("==========job failed");
	}
}
 
@Test
public void readExcelInputFormatExcel2013SingleSheetLowFootPrintStaxPartlyInMemoryCompressed() throws IOException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName = "excel2013test.xlsx";
	String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
	Path file = new Path(fileNameSpreadSheet);
	// set locale to the one of the test data
	conf.set("hadoopoffice.read.locale.bcp47", "de");
	// low footprint
	conf.set("hadoopoffice.read.lowFootprint", "true");
	// stax parser
	conf.set("hadoopoffice.read.lowFootprint.parser", "stax");
	// partly in memory compressed
	conf.set("hadoopoffice.read.lowFootprint.stax.sst.cache", "1");
	conf.set("hadoopoffice.read.lowFootprint.stax.sst.compress", "true");
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	ExcelFileInputFormat format = new ExcelFileInputFormat();
	List<InputSplit> splits = format.getSplits(job);
	assertEquals(1, splits.size(), "Only one split generated for Excel file");
	RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
	assertNotNull(reader, "Format returned  null RecordReader");
	reader.initialize(splits.get(0), context);
	Text spreadSheetKey = new Text();
	ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 1");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals("[excel2013test.xlsx]Sheet1!A1", spreadSheetKey.toString(),
			"Input Split for Excel file has keyname == \"[excel2013test.xlsx]Sheet1!A1\"");
	assertEquals(4, spreadSheetValue.get().length, "Input Split for Excel file contains row 1 with 4 columns");
	assertEquals("test1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 1 with cell 1 == \"test1\"");
	assertEquals("Sheet1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getSheetName(),
			"Input Split for Excel file contains row 1 with cell 1 sheetname == \"Sheet1\"");
	assertEquals("A1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getAddress(),
			"Input Split for Excel file contains row 1 with cell 1 address == \"A1\"");
	assertEquals("test2", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
			"Input Split for Excel file contains row 1 with cell 2 == \"test2\"");
	assertEquals("test3", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
			"Input Split for Excel file contains row 1 with cell 3 == \"test3\"");
	assertEquals("test4", ((SpreadSheetCellDAO) spreadSheetValue.get()[3]).getFormattedValue(),
			"Input Split for Excel file contains row 1 with cell 4 == \"test4\"");
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 2");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals(1, spreadSheetValue.get().length, "Input Split for Excel file contains row 2 with 1 column");
	assertEquals("4", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 2 with cell 1 == \"4\"");
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 3");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals(5, spreadSheetValue.get().length, "Input Split for Excel file contains row 3 with 5 columns");
	assertEquals("31/12/99", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 3 with cell 1 == \"31/12/99\"");
	assertEquals("5", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
			"Input Split for Excel file contains row 3 with cell 2 == \"5\"");
	assertNull(spreadSheetValue.get()[2], "Input Split for Excel file contains row 3 with cell 3 == null");
	assertNull(spreadSheetValue.get()[3], "Input Split for Excel file contains row 3 with cell 4 == null");
	assertEquals("null", ((SpreadSheetCellDAO) spreadSheetValue.get()[4]).getFormattedValue(),
			"Input Split for Excel file contains row 3 with cell 5 == \"null\"");
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 4");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals(1, spreadSheetValue.get().length, "Input Split for Excel file contains row 4 with 1 column");
	assertEquals("1", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 4 with cell 1 == \"1\"");
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 5");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals(3, spreadSheetValue.get().length, "Input Split for Excel file contains row 5 with 3 columns");
	assertEquals("2", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 5 with cell 1 == \"2\"");
	assertEquals("6", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
			"Input Split for Excel file contains row 5 with cell 2== \"6\"");
	assertEquals("10", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
			"Input Split for Excel file contains row 5 with cell 3== \"10\"");
	assertTrue(reader.nextKeyValue(), "Input Split for Excel file contains row 6");
	spreadSheetKey = reader.getCurrentKey();
	spreadSheetValue = reader.getCurrentValue();
	assertEquals(3, spreadSheetValue.get().length, "Input Split for Excel file contains row 6 with 3 columns");
	assertEquals("3", ((SpreadSheetCellDAO) spreadSheetValue.get()[0]).getFormattedValue(),
			"Input Split for Excel file contains row 6 with cell 1 == \"3\"");
	assertEquals("4", ((SpreadSheetCellDAO) spreadSheetValue.get()[1]).getFormattedValue(),
			"Input Split for Excel file contains row 6 with cell 2== \"4\"");
	assertEquals("15", ((SpreadSheetCellDAO) spreadSheetValue.get()[2]).getFormattedValue(),
			"Input Split for Excel file contains row 6 with cell 3== \"15\"");
}
 
源代码20 项目: xxhadoop   文件: StepTwoJob.java
public int run(String[] args) throws Exception {
		
	    /*Configuration conf = getConf();
	    JobClient client = new JobClient(conf);
	    ClusterStatus cluster = client.getClusterStatus();
	    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
	    String join_reduces = conf.get(REDUCES_PER_HOST);
	    if (join_reduces != null) {
	       num_reduces = cluster.getTaskTrackers() *
	                       Integer.parseInt(join_reduces);
	    }
	    // Set user-supplied (possibly default) job configs
	    job.setNumReduceTasks(num_reduces);*/
	    
	    
		Configuration conf = new Configuration();
		//conf.set("fs.defaultFS", "hdfs://node-01:9000");
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		String commaSeparatedPaths = null;
		String outputDir = null;
		if (otherArgs.length == 2) {
			commaSeparatedPaths = otherArgs[0];
			outputDir = otherArgs[1];
		} else {
			System.err.println("Usage: <in>[,<in>...] <out>");
			//System.exit(-1);
			return -1;
		}
		

		Job job = Job.getInstance(conf);
		job.setJobName("StepTwoJob");
		job.setJarByClass(StepTwoJob.class);
		
//		job.setInputFormatClass(TextInputFormat.class);
//		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setMapperClass(StepTwoMapper.class);
//		job.setCombinerClass(StepOneReducer.class);
		job.setReducerClass(StepTwoReducer.class);
		
//		job.setPartitionerClass(FlowPartition.class);
//		job.setNumReduceTasks(5);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.setInputPaths(job, commaSeparatedPaths);
		FileOutputFormat.setOutputPath(job, new Path(outputDir));

		return job.waitForCompletion(true) ? 0 : 1;
	}