下面列出了org.apache.hadoop.fs.FileUtil#stat2Paths ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static String readOutput(Path outDir,
JobConf conf) throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
{
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new OutputLogFilter()));
for(int i=0; i < fileList.length; ++i) {
LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
}
return result.toString();
}
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
* SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
}
public void testComplexNameWithRegex() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("name \\Evalue]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(numMappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(Path dir,
Configuration conf) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to
* SequenceFiles. by virtue other SequenceFileOutputFormat is not
* coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
JobConf conf = new JobConf(TestJavaSerialization.class);
conf.setJobName("JavaSerialization");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
// test we can write to sequence files
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
}
public void testComplexName() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("[name][some other value that gets truncated internally that this test attempts to aggravate]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
static String launchWordCount(URI fileSys, JobConf conf, String input,
int numMaps, int numReduces)
throws IOException {
final Path inDir = new Path("/testing/wc/input");
final Path outDir = new Path("/testing/wc/output");
FileSystem fs = FileSystem.get(fileSys, conf);
configureWordCount(fs, conf, input, numMaps, numReduces, inDir, outDir);
JobClient.runJob(conf);
StringBuffer result = new StringBuffer();
{
Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for(int i=0; i < fileList.length; ++i) {
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
}
return result.toString();
}
/** Open the output generated by this format. */
public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir)
throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new SequenceFile.Reader(fs, names[i], conf);
}
return parts;
}
/** Open the output generated by this format. */
private MapFile.Reader[] getReaders(String subDir) throws IOException {
Path dir = new Path(segmentDir, subDir);
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, SegmentPathFilter.INSTANCE));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
public void testMapReduceJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("a\t1", reader.readLine());
assertEquals("b\t1", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(Path dir,
Configuration conf) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
// sort names, so that hash partitioning works
Arrays.sort(names);
MapFile.Reader[] parts = new MapFile.Reader[names.length];
for (int i = 0; i < names.length; i++) {
parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
}
return parts;
}
public void testLazyOutput() throws Exception {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_HADOOP_SLAVES)
.build();
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
int numReducers = 2;
int numMappers = NUM_HADOOP_SLAVES * NUM_MAPS_PER_NODE;
createInput(fileSys, numMappers);
Path output1 = new Path("/testlazy/output1");
// Test 1.
runTestLazyOutput(mr.createJobConf(), output1,
numReducers, true);
Path[] fileList =
FileUtil.stat2Paths(fileSys.listStatus(output1,
new Utils.OutputFileUtils.OutputFilesFilter()));
for(int i=0; i < fileList.length; ++i) {
System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
}
assertTrue(fileList.length == (numReducers - 1));
// Test 2. 0 Reducers, maps directly write to the output files
Path output2 = new Path("/testlazy/output2");
runTestLazyOutput(mr.createJobConf(), output2, 0, true);
fileList =
FileUtil.stat2Paths(fileSys.listStatus(output2,
new Utils.OutputFileUtils.OutputFilesFilter()));
for(int i=0; i < fileList.length; ++i) {
System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
}
assertTrue(fileList.length == numMappers - 1);
// Test 3. 0 Reducers, but flag is turned off
Path output3 = new Path("/testlazy/output3");
runTestLazyOutput(mr.createJobConf(), output3, 0, false);
fileList =
FileUtil.stat2Paths(fileSys.listStatus(output3,
new Utils.OutputFileUtils.OutputFilesFilter()));
for(int i=0; i < fileList.length; ++i) {
System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
}
assertTrue(fileList.length == numMappers);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
}
}
public void testMapReduceJob() throws Exception {
JobConf conf = new JobConf(TestUserDefinedCounters.class);
conf.setJobName("UserDefinedCounters");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(CountingMapper.class);
conf.setReducerClass(IdentityReducer.class);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
RunningJob runningJob = JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = fs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
assertTrue(line.contains("hello"));
line = reader.readLine();
}
reader.close();
assertEquals(4, counter);
}
verifyCounters(runningJob, 4);
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
public void _testMapReduce(boolean restart) throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("hello1\n");
wr.write("hello2\n");
wr.write("hello3\n");
wr.write("hello4\n");
wr.close();
if (restart) {
stopCluster();
startCluster(false, null);
}
JobConf conf = createJobConf();
conf.setJobName("mr");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
assertTrue(line.contains("hello"));
line = reader.readLine();
}
reader.close();
assertEquals(4, counter);
}
}
public void testMapReduceJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("hello1\n");
wr.write("hello2\n");
wr.write("hello3\n");
wr.write("hello4\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("counters");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(CountingMapper.class);
conf.setReducerClass(IdentityReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
RunningJob runningJob = JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
assertTrue(line.contains("hello"));
line = reader.readLine();
}
reader.close();
assertEquals(4, counter);
}
assertEquals(4,
runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
assertEquals(4,
runningJob.getCounters().getGroup("StringCounter")
.getCounter("MapRecords"));
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
public void testMapReduceJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("hello1\n");
wr.write("hello2\n");
wr.write("hello3\n");
wr.write("hello4\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("counters");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(CountingMapper.class);
conf.setReducerClass(IdentityReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
RunningJob runningJob = JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
assertTrue(line.contains("hello"));
line = reader.readLine();
}
reader.close();
assertEquals(4, counter);
}
assertEquals(4,
runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
assertEquals(4,
runningJob.getCounters().getGroup("StringCounter")
.getCounter("MapRecords"));
}
public void doTestTextBatchAppend(boolean useRawLocalFileSystem)
throws Exception {
LOG.debug("Starting...");
final long rollCount = 10;
final long batchSize = 2;
final String fileName = "FlumeData";
String newPath = testPath + "/singleTextBucket";
int totalEvents = 0;
int i = 1, j = 1;
// clear the test directory
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(newPath);
fs.delete(dirPath, true);
fs.mkdirs(dirPath);
Context context = new Context();
// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.rollInterval", "0");
context.put("hdfs.rollSize", "0");
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.writeFormat", "Text");
context.put("hdfs.useRawLocalFileSystem",
Boolean.toString(useRawLocalFileSystem));
context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);
sink.start();
Calendar eventDate = Calendar.getInstance();
List<String> bodies = Lists.newArrayList();
// push the event batches into channel to roll twice
for (i = 1; i <= (rollCount*10)/batchSize; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
String body = "Test." + i + "." + j;
event.setBody(body.getBytes());
bodies.add(body);
channel.put(event);
totalEvents++;
}
txn.commit();
txn.close();
// execute sink to process the events
sink.process();
}
sink.stop();
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
if (totalEvents % rollCount > 0) expectedFiles++;
Assert.assertEquals("num files wrong, found: " +
Lists.newArrayList(fList), expectedFiles, fList.length);
// check the contents of the all files
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}