下面列出了怎么用org.apache.hadoop.mapred.TextInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private static boolean isInputFormatSupported(Properties schema)
{
String inputFormat = getInputFormatName(schema);
if (TextInputFormat.class.getName().equals(inputFormat)) {
if (!Objects.equals(schema.getProperty(SKIP_HEADER_COUNT_KEY, "0"), "0")) {
// S3 Select supports skipping one line of headers, but it was returning incorrect results for presto-hive-hadoop2/conf/files/test_table_with_header.csv.gz
// TODO https://github.com/prestosql/presto/issues/2349
return false;
}
if (!Objects.equals(schema.getProperty(SKIP_FOOTER_COUNT_KEY, "0"), "0")) {
// S3 Select does not support skipping footers
return false;
}
return true;
}
return false;
}
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
Class<? extends Reducer> reduceClass) throws Exception {
JobConf conf = new JobConf(PersonVersion.class);
conf.setJobName(jobName);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(mapClass);
conf.setCombinerClass(reduceClass);
conf.setReducerClass(reduceClass);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
JobClient.runJob(conf);
}
public CSVReadTask(InputSplit split, SplitOffsetInfos offsets,
TextInputFormat informat, JobConf job, MatrixBlock dest,
long rlen, long clen, boolean hasHeader, String delim,
boolean fill, double fillValue, int splitCount)
{
_split = split;
_splitoffsets = offsets; // new SplitOffsetInfos(offsets);
_sparse = dest.isInSparseFormat();
_informat = informat;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_isFirstSplit = (splitCount == 0);
_hasHeader = hasHeader;
_fill = fill;
_fillValue = fillValue;
_delim = delim;
_rc = true;
_splitCount = splitCount;
}
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
if( fs.isDirectory(path) ) {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
for(InputSplit split: splits)
readTextCellFrameFromInputSplit(split, informat, job, dest);
}
else {
readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
}
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void run(String[] args) throws Exception
{
JobConf conf = new JobConf(this.getClass());
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
public void configure(JobConf job) {
// Set the mapper and reducers
job.setMapperClass(ReadDataJob.TestMapper.class);
// Make sure this jar is included
job.setJarByClass(ReadDataJob.TestMapper.class);
// Specify the input and output data formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
// Turn off speculative execution
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// Add the job input path
FileInputFormat.addInputPath(job, new Path(this.input_path));
}
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, Path path)
{
if (inputFormat instanceof TextInputFormat) {
return getCompressionCodec((TextInputFormat) inputFormat, path)
.map(codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec))
.orElse(false); // TODO (https://github.com/prestosql/presto/issues/2475) fix S3 Select when file not compressed
}
return false;
}
public ReadTask( InputSplit split, TextInputFormat informat, JobConf job, MatrixBlock dest, long rlen, long clen, boolean mm, FileFormatPropertiesMM mmProps ) {
_split = split;
_sparse = dest.isInSparseFormat();
_informat = informat;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_matrixMarket = mm;
_mmProps = mmProps;
}
public void configure(JobConf job) {
this.fieldSeparator = job.get("mapred.data.field.separator", "\t");
this.mapOutputKeyValueSpec = job.get("map.output.key.value.fields.spec",
"0-:");
this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
job.getInputFormat().getClass().getCanonicalName());
this.reduceOutputKeyValueSpec = job.get(
"reduce.output.key.value.fields.spec", "0-:");
parseOutputKeyValueSpec();
LOG.info(specToString());
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen,
boolean hasHeader, String delim, boolean fill, double fillValue)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<CSVReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new CSVReadTask(split, _offsets, informat, job, dest,
rlen, clen, hasHeader, delim, fill, fillValue, splitCount++) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (CSVReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for csv input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException
{
CollectorTestSink sortSink = new CollectorTestSink();
oper.output.setSink(sortSink);
oper.setMapClass(WordCount.Map.class);
oper.setCombineClass(WordCount.Reduce.class);
oper.setDirName(testMeta.testDir);
oper.setConfigFile(null);
oper.setInputFormatClass(TextInputFormat.class);
Configuration conf = new Configuration();
JobConf jobConf = new JobConf(conf);
FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir));
TextInputFormat inputFormat = new TextInputFormat();
inputFormat.configure(jobConf);
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
SerializationFactory serializationFactory = new SerializationFactory(conf);
Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
keySerializer.open(oper.getOutstream());
keySerializer.serialize(splits[0]);
oper.setInputSplitClass(splits[0].getClass());
keySerializer.close();
oper.setup(null);
oper.beginWindow(0);
oper.emitTuples();
oper.emitTuples();
oper.endWindow();
oper.beginWindow(1);
oper.emitTuples();
oper.endWindow();
Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
LOG.debug(o.toString());
}
LOG.debug("Done testing round\n");
oper.teardown();
}
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims, ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if (schema.length == 1)
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
int[] ix = new int[dims.length];
for (InputSplit split : splits) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
while (reader.next(key, value)) {
String[] parts = Arrays.stream(IOUtilFunctions.splitCSV(value.toString(), " "))
.filter(s -> !s.isEmpty()).toArray(String[]::new);
for (int i = 0; i < ix.length; i++) {
ix[i] = Integer.parseInt(parts[i]) - 1;
}
ret.set(ix, parts[ix.length]);
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
catch (Exception ex) {
throw new IOException("Unable to read tensor in text cell format.", ex);
}
return ret;
}
public ReadRowsTask(InputSplit split, TextInputFormat inputFormat, JobConf jobConf,
FrameBlock dest, Map<String, Integer> schemaMap, int offset)
{
_split = split;
_inputFormat = inputFormat;
_jobConf = jobConf;
_dest = dest;
_schemaMap = schemaMap;
_offset = offset;
}
@Override
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, numThreads);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
ExecutorService pool = CommonThreadPool.get(numThreads);
try {
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
List<Future<Long>> cret = pool.invokeAll(tasks);
for( Future<Long> count : cret )
nrow += count.get().intValue();
}
catch (Exception e) {
throw new IOException("Failed parallel read of text csv input.", e);
}
finally {
pool.shutdown();
}
return new Pair<>(nrow, ncol);
}
public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, boolean first) {
_split = split;
_informat = informat;
_job = job;
_hasHeader = hasHeader;
_firstSplit = first;
}
public ReadRowsTask(InputSplit split, TextInputFormat informat, JobConf job,
FrameBlock dest, int offset, boolean first)
{
_split = split;
_informat = informat;
_job = job;
_dest = dest;
_offset = offset;
_isFirstSplit = first;
}
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
for( int i=0, rpos=0; i<splits.length; i++ )
rpos = readCSVFrameFromInputSplit(splits[i], informat,
job, dest, schema, names, rlen, clen, rpos, i==0);
}
public static String getDataFormat(StorageDescriptor descriptor) {
Preconditions.checkNotNull(descriptor);
String serde = descriptor.getSerdeInfo().getSerializationLib();
String inputFormat = descriptor.getInputFormat();
if (LazySimpleSerDe.class.getName().equals(serde)) {
if (TextInputFormat.class.getName().equals(inputFormat)) {
return BuiltinStorages.TEXT;
} else if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
return BuiltinStorages.SEQUENCE_FILE;
} else {
throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
}
} else if (LazyBinarySerDe.class.getName().equals(serde)) {
if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
return BuiltinStorages.SEQUENCE_FILE;
} else {
throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
}
} else if (LazyBinaryColumnarSerDe.class.getName().equals(serde) || ColumnarSerDe.class.getName().equals(serde)) {
if (RCFileInputFormat.class.getName().equals(inputFormat)) {
return BuiltinStorages.RCFILE;
} else {
throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
}
} else if (ParquetHiveSerDe.class.getName().equals(serde)) {
return BuiltinStorages.PARQUET;
} else if (AvroSerDe.class.getName().equals(serde)) {
return BuiltinStorages.AVRO;
} else if (OrcSerde.class.getName().equals(serde)) {
return BuiltinStorages.ORC;
} else if (RegexSerDe.class.getName().equals(serde)) {
return BuiltinStorages.REGEX;
} else {
throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
}
}
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
Types.ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if( schema.length == 1 )
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
ExecutorService pool = CommonThreadPool.get(_numThreads);
InputSplit[] splits = informat.getSplits(job, _numThreads);
//create and execute read tasks for all splits
List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
.collect(Collectors.toList());
List<Future<Object>> rt = pool.invokeAll(tasks);
//check for exceptions
for (Future<Object> task : rt)
task.get();
pool.shutdown();
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
return ret;
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
//post-processing (representation-specific, change of sparse/dense block representation)
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
private void readLIBSVMMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<LIBSVMReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new LIBSVMReadTask(split, _offsets, informat, job, dest, rlen, clen, splitCount++) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (LIBSVMReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for libsvm input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
@Override
public Plan getPlan(String... args) {
// parse job parameters
int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String dataInput = (args.length > 1 ? args[1] : "");
String output = (args.length > 2 ? args[2] : "");
HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
new TextInputFormat(), new JobConf(), "Input Lines");
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
MapOperator mapper = MapOperator.builder(new TokenizeLine())
.input(source)
.name("Tokenize Lines")
.build();
ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
.input(mapper)
.name("Count Words")
.build();
HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
Plan plan = new Plan(out, "Hadoop OutputFormat Example");
plan.setDefaultParallelism(numSubTasks);
return plan;
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
public ReadTask( InputSplit split, TextInputFormat informat, JobConf job, MatrixBlock dest, long rlen, long clen, boolean mm, FileFormatPropertiesMM mmProps ) {
_split = split;
_sparse = dest.isInSparseFormat();
_informat = informat;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_matrixMarket = mm;
_mmProps = mmProps;
}
public void LogsCountApplication()
{
setMapClass(LogCountsPerHour.LogMapClass.class);
// setCombineClass(LogCountsPerHour.LogReduce.class);
setReduceClass(LogCountsPerHour.LogReduce.class);
setInputFormat(TextInputFormat.class);
}