下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.TextInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public static void assignInitialRanks (Configuration conf, FileSystem fs, String adjacencyPath, String initialPath, int numVertices) throws Exception {
Path seqFile = new Path (initialPath);
if (fs.exists(seqFile)) {
fs.delete(seqFile, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(InitialRankAssigner.class);
job.setMapperClass(InitialRankAssigner.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Message.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Message.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(adjacencyPath));
FileOutputFormat.setOutputPath(job, seqFile);
job.waitForCompletion(true);
}
@Override
public int run(String[] args) throws Exception {
System.out.println("Running MR: MRWordCount21");
Job job = new Job(getConf());
job.setJarByClass(MRWordCount21.class);
job.setJobName("MRWordCount21");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map21.class);
job.setCombinerClass(Reduce21.class);
job.setReducerClass(Reduce21.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
System.out.println("Input path: " + args[0]);
System.out.println("Output path: " + args[1]);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public void execute(String inputPath1, String inputPath2, String outputPath) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "bigdiff");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputPath1));
FileInputFormat.addInputPath(job, new Path(inputPath2));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
private static void runTestLazyOutput(Configuration conf, Path output,
int numReducers, boolean createLazily)
throws Exception {
Job job = Job.getInstance(conf, "Test-Lazy-Output");
FileInputFormat.setInputPaths(job, INPUT);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(TestMapReduceLazyOutput.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(numReducers);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
if (createLazily) {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
} else {
job.setOutputFormatClass(TextOutputFormat.class);
}
assertTrue(job.waitForCompletion(true));
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P1Q2");
job.setJarByClass(P1Q2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(P1Q2Map.class);
job.setCombinerClass(P1Q2Reduce.class);
job.setReducerClass(P1Q2Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void main(String [] args) throws Exception
{
Path outDir = new Path("output");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "user name check");
job.setJarByClass(UserNamePermission.class);
job.setMapperClass(UserNamePermission.UserNameMapper.class);
job.setCombinerClass(UserNamePermission.UserNameReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(UserNamePermission.UserNameReducer.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, outDir);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.fieldSeparator =
conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
this.mapOutputKeyValueSpec =
conf.get(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
try {
this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
context.getInputFormatClass().getCanonicalName());
} catch (ClassNotFoundException e) {
throw new IOException("Input format class not found", e);
}
allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
}
public Job createJob(boolean failMappers, boolean failReducers, Path inputFile)
throws IOException {
Configuration conf = getConf();
conf.setBoolean(FAIL_MAP, failMappers);
conf.setBoolean(FAIL_REDUCE, failReducers);
Job job = Job.getInstance(conf, "fail");
job.setJarByClass(FailJob.class);
job.setMapperClass(FailMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FailReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("Fail job");
FileInputFormat.addInputPath(job, inputFile);
return job;
}
private static void runTestLazyOutput(Configuration conf, Path output,
int numReducers, boolean createLazily)
throws Exception {
Job job = Job.getInstance(conf, "Test-Lazy-Output");
FileInputFormat.setInputPaths(job, INPUT);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(TestMapReduceLazyOutput.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(numReducers);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
if (createLazily) {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
} else {
job.setOutputFormatClass(TextOutputFormat.class);
}
assertTrue(job.waitForCompletion(true));
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P1Q3");
job.setJarByClass(P1Q3.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(P1Q3Map.class);
//job.setCombinerClass(P1Q3Reduce.class);
job.setReducerClass(P1Q3Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P2Q1");
job.setJarByClass(P2Q1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(P2Q1Map.class);
job.setCombinerClass(P2Q1Reduce.class);
job.setReducerClass(P2Q1Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
@Test
public void readInputFormat() throws Exception {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(FileSystemInput.FORMAT_CONFIG, "input-format");
paramMap.put(FileSystemInput.PATH_CONFIG, FileSystemInput.class.getResource(CSV_DATA).getPath());
paramMap.put(FileSystemInput.INPUT_FORMAT_TYPE_CONFIG, TextInputFormat.class.getCanonicalName());
paramMap.put("translator" + "." + ComponentFactory.TYPE_CONFIG_NAME,
DummyInputFormatTranslator.class.getCanonicalName());
config = ConfigFactory.parseMap(paramMap);
FileSystemInput formatInput = new FileSystemInput();
assertNoValidationFailures(formatInput, config);
formatInput.configure(config);
Dataset<Row> results = formatInput.read();
assertEquals("Invalid number of rows", 4, results.count());
assertEquals("Invalid first row result", 0L, results.first().getLong(0));
assertEquals("Invalid first row result", "One,Two,Three,Four", results.first().getString(1));
}
private void write(final Configuration conf, final Path inputPath,
final Path parquetPath, Class<? extends Mapper> mapperClass, Class<? extends TBase<?, ?>> outputClass) throws IOException, Exception {
final Job job = new Job(conf, "write");
// input not really used
TextInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(mapperClass);
job.setNumReduceTasks(0);
job.setOutputFormatClass(ParquetThriftOutputFormat.class);
ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.GZIP);
ParquetThriftOutputFormat.setOutputPath(job, parquetPath);
ParquetThriftOutputFormat.setThriftClass(job, outputClass);
waitForJob(job);
}
@Before
public void createParquetFile() throws Exception {
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
{
final Job job = new Job(conf, "write");
// input not really used
TextInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TestSpecificInputOutputFormat.MyMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, parquetPath);
AvroParquetOutputFormat.setSchema(job, Car.SCHEMA$);
waitForJob(job);
}
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P2Q3");
job.setJarByClass(P2Q3.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(P2Q3Map.class);
job.setCombinerClass(P2Q3Reduce.class);
job.setReducerClass(P2Q3Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P2Q2");
job.setJarByClass(P2Q2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(P2Q2Map.class);
job.setCombinerClass(P2Q2Reduce.class);
job.setReducerClass(P2Q2Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void main(String[] args) {
//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
System.setProperty("hadoop.home.dir", "E:\\hadoop");
//Logger rootLogger = LogManager.getRootLogger();
//rootLogger.setLevel(Level.WARN);
SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");
String inputDirectory="E:\\hadoop\\streamFolder\\";
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(1));
// streamingContext.checkpoint("E:\\hadoop\\checkpoint");
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
JavaDStream<String> streamfile = streamingContext.textFileStream(inputDirectory);
streamfile.print();
streamfile.foreachRDD(rdd-> rdd.foreach(x -> System.out.println(x)));
JavaPairDStream<LongWritable, Text> streamedFile = streamingContext.fileStream(inputDirectory, LongWritable.class, Text.class, TextInputFormat.class);
streamedFile.print();
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void execute(Config config, int workerID, IWorkerController workerController,
IPersistentVolume persistentVolume, IVolatileVolume volatileVolume) {
WorkerEnvironment workerEnv = WorkerEnvironment.init(config, workerID, workerController,
persistentVolume, volatileVolume);
BatchTSetEnvironment tSetEnv = TSetEnvironment.initBatch(workerEnv);
Configuration configuration = new Configuration();
configuration.addResource(
new Path(HdfsDataContext.getHdfsConfigDirectory(config)));
configuration.set(TextInputFormat.INPUT_DIR, "/input4");
SourceTSet<String> source =
tSetEnv.createHadoopSource(configuration, TextInputFormat.class, 4,
new MapFunc<String, Tuple<LongWritable, Text>>() {
@Override
public String map(Tuple<LongWritable, Text> input) {
return input.getKey().toString() + " : " + input.getValue().toString();
}
});
SinkTSet<Iterator<String>> sink = source.direct().sink((SinkFunc<Iterator<String>>) value -> {
while (value.hasNext()) {
String next = value.next();
LOG.info("Received value: " + next);
}
return true;
});
tSetEnv.run(sink);
}
@Override
public int run(String[] args) throws Exception {
if (getConf().get(CONF_FIELD_NAMES, null) == null) {
throw new IllegalArgumentException("Must include configuration '" + CONF_FIELD_NAMES + "'");
}
Job job = Job.getInstance(getConf(), "LoadToES");
// DO NOT SET JAR BY CLASS HERE
//
// job.setJarByClass(getClass());
EsMapReduceUtil.initCredentials(job);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapperClass(MapperImpl.class);
// Secure Hadoop CANNOT perform shuffle phases without native libraries
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(LinkedMapWritable.class);
if (!job.waitForCompletion(true)) {
return 1;
}
return 0;
}
public void testValueIterReset() {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TestValueIterReset") ;
job.setJarByClass(TestValueIterReset.class);
job.setMapperClass(TestMapper.class);
job.setReducerClass(TestReducer.class);
job.setNumReduceTasks(NUM_TESTS);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.getConfiguration().
setInt(MRJobConfig.REDUCE_MARKRESET_BUFFER_SIZE,128);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(TEST_ROOT_DIR + "/in"));
Path output = new Path(TEST_ROOT_DIR + "/out");
localFs.delete(output, true);
FileOutputFormat.setOutputPath(job, output);
createInput();
assertTrue(job.waitForCompletion(true));
validateOutput();
} catch (Exception e) {
e.printStackTrace();
assertTrue(false);
}
}
public UnshardedExportToCloudStorage(
Configuration configuration,
String gcsPath,
ExportFileFormat fileFormat,
BigQueryHelper bigQueryHelper,
String projectId,
Table tableToExport,
@Nullable InputFormat<LongWritable, Text> delegateInputFormat) {
super(configuration, gcsPath, fileFormat, bigQueryHelper, projectId, tableToExport);
if (delegateInputFormat == null) {
this.delegateInputFormat = new TextInputFormat();
} else {
this.delegateInputFormat = delegateInputFormat;
}
}
/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
private InputSplitInfo generateOldSplits(Path inputSplitsDir)
throws Exception {
JobConf jobConf = new JobConf();
jobConf.setUseNewMapper(false);
jobConf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
jobConf.set(TextInputFormat.INPUT_DIR, testFilePath.toString());
return MRHelpers.generateInputSplits(jobConf, inputSplitsDir);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 4) {
printUsage();
}
Job job = new Job(conf, "ReduceSideJoin");
job.setJarByClass(ReduceSideJoin.class);
// Use MultipleInputs to set which input uses what mapper
// This will keep parsing of each data set separate from a logical
// standpoint
// The first two elements of the args array are the two inputs
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentJoinMapper.class);
job.getConfiguration().set("join.type", args[2]);
job.setReducerClass(UserJoinReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[3]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
protected int runCombineJob(String halvadeOutDir, String mergeOutDir, boolean featureCount) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration combineConf = getConf();
if(!halvadeOpts.out.endsWith("/")) halvadeOpts.out += "/";
HalvadeConf.setInputDir(combineConf, halvadeOutDir);
HalvadeConf.setOutDir(combineConf, mergeOutDir);
FileSystem outFs = FileSystem.get(new URI(mergeOutDir), combineConf);
if (outFs.exists(new Path(mergeOutDir))) {
Logger.INFO("The output directory \'" + mergeOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
HalvadeConf.setReportAllVariant(combineConf, halvadeOpts.reportAll);
HalvadeResourceManager.setJobResources(halvadeOpts, combineConf, HalvadeResourceManager.COMBINE, false, halvadeOpts.useBamInput);
// halvadeOpts.splitChromosomes(combineConf, 0);
Job combineJob = Job.getInstance(combineConf, "HalvadeCombineVCF");
combineJob.setJarByClass(VCFCombineMapper.class);
addInputFiles(halvadeOutDir, combineConf, combineJob, featureCount ? ".count" : ".vcf");
FileOutputFormat.setOutputPath(combineJob, new Path(mergeOutDir));
combineJob.setMapperClass(featureCount ? HTSeqCombineMapper.class : VCFCombineMapper.class);
combineJob.setMapOutputKeyClass(featureCount ? Text.class : LongWritable.class);
combineJob.setMapOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);
combineJob.setInputFormatClass(featureCount ? TextInputFormat.class : VCFInputFormat.class);
combineJob.setNumReduceTasks(1);
combineJob.setReducerClass(featureCount ?
be.ugent.intec.halvade.hadoop.mapreduce.HTSeqCombineReducer.class :
be.ugent.intec.halvade.hadoop.mapreduce.VCFCombineReducer.class);
combineJob.setOutputKeyClass(Text.class);
combineJob.setOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);
return runTimedJob(combineJob, (featureCount ? "featureCounts" : "VCF") + " Combine Job");
}
/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
private DataSourceDescriptor generateDataSourceDescriptorMapReduce(Path inputSplitsDir)
throws Exception {
JobConf jobConf = new JobConf(dfsCluster.getFileSystem().getConf());
jobConf.setUseNewMapper(true);
jobConf.setClass(org.apache.hadoop.mapreduce.MRJobConfig.INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class,
InputFormat.class);
jobConf.set(TextInputFormat.INPUT_DIR, testFilePath.toString());
return MRInputHelpers.configureMRInputWithLegacySplitGeneration(jobConf, inputSplitsDir, true);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step2DistinctDataJob.class);
job.setJobName(Phase3Step2DistinctDataJob.class.getName());
//mapper
job.setMapperClass(RemoveRedundantDataMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//reducer
job.setReducerClass(RemoveRedundantDataReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
job.setInputFormatClass(TextInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
//i/o paths
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
private InputSplitInfo generateNewSplits(Path inputSplitsDir)
throws Exception {
JobConf jobConf = new JobConf();
jobConf.setUseNewMapper(true);
jobConf.setClass(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class,
InputFormat.class);
jobConf.set(TextInputFormat.INPUT_DIR, testFilePath.toString());
return MRHelpers.generateInputSplits(jobConf, inputSplitsDir);
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(WordCount.class.getName(), args);
// Create Accumulo table and attach Summing iterator
try (AccumuloClient client = opts.createAccumuloClient()) {
client.tableOperations().create(opts.tableName);
IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setColumns(is,
Collections.singletonList(new IteratorSetting.Column("count")));
SummingCombiner.setEncodingType(is, SummingCombiner.Type.STRING);
client.tableOperations().attachIterator(opts.tableName, is);
} catch (TableExistsException e) {
// ignore
}
// Create M/R job
Job job = Job.getInstance(opts.getHadoopConfig());
job.setJobName(WordCount.class.getName());
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path(opts.inputDirectory));
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
if (opts.hdfsPath != null) {
AccumuloOutputFormat.configure().clientPropertiesPath(opts.hdfsPath)
.defaultTable(opts.tableName).store(job);
} else {
AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
.defaultTable(opts.tableName).store(job);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}