下面列出了怎么用org.apache.hadoop.mapred.Utils的API类实例代码及写法,或者点击链接到github查看源代码。
/** @return a list of Path objects for each data file */
protected List<Path> getDataFilePaths() throws IOException {
List<Path> paths = new ArrayList<Path>();
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(conf);
FileStatus[] stats = fs.listStatus(getTablePath(),
new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus stat : stats) {
paths.add(stat.getPath());
}
return paths;
}
/** @return a list of Path objects for each data file */
protected List<Path> getDataFilePaths() throws IOException {
List<Path> paths = new ArrayList<Path>();
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FileSystem fs = FileSystem.get(conf);
FileStatus [] stats = fs.listStatus(getTablePath(),
new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus stat : stats) {
paths.add(stat.getPath());
}
return paths;
}
private void validateOutput() throws IOException {
Path[] outputFiles = FileUtil.stat2Paths(
localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = localFs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
while (line != null) {
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String key = tokeniz.nextToken();
String value = tokeniz.nextToken();
LOG.info("Output: key: "+ key + " value: "+ value);
int errors = Integer.parseInt(value);
assertTrue(errors == 0);
line = reader.readLine();
}
reader.close();
}
}
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
private static void checkOuterConsistency(Job job, Path[] src)
throws IOException {
Path outf = FileOutputFormat.getOutputPath(job);
FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new
Utils.OutputFileUtils.OutputFilesFilter());
assertEquals("number of part files is more than 1. It is" + outlist.length,
1, outlist.length);
assertTrue("output file with zero length" + outlist[0].getLen(),
0 < outlist[0].getLen());
SequenceFile.Reader r =
new SequenceFile.Reader(cluster.getFileSystem(),
outlist[0].getPath(), job.getConfiguration());
IntWritable k = new IntWritable();
IntWritable v = new IntWritable();
while (r.next(k, v)) {
assertEquals("counts does not match", v.get(),
countProduct(k, src, job.getConfiguration()));
}
r.close();
}
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
private void validateOutput() throws IOException {
Path[] outputFiles = FileUtil.stat2Paths(
localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = localFs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
while (line != null) {
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String key = tokeniz.nextToken();
String value = tokeniz.nextToken();
LOG.info("Output: key: "+ key + " value: "+ value);
int errors = Integer.parseInt(value);
assertTrue(errors == 0);
line = reader.readLine();
}
reader.close();
}
}
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
private static void checkOuterConsistency(Job job, Path[] src)
throws IOException {
Path outf = FileOutputFormat.getOutputPath(job);
FileStatus[] outlist = cluster.getFileSystem().listStatus(outf, new
Utils.OutputFileUtils.OutputFilesFilter());
assertEquals("number of part files is more than 1. It is" + outlist.length,
1, outlist.length);
assertTrue("output file with zero length" + outlist[0].getLen(),
0 < outlist[0].getLen());
SequenceFile.Reader r =
new SequenceFile.Reader(cluster.getFileSystem(),
outlist[0].getPath(), job.getConfiguration());
IntWritable k = new IntWritable();
IntWritable v = new IntWritable();
while (r.next(k, v)) {
assertEquals("counts does not match", v.get(),
countProduct(k, src, job.getConfiguration()));
}
r.close();
}
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
private void testComparator(String keySpec, int expect)
throws Exception {
String root = System.getProperty("test.build.data", "/tmp");
Path inDir = new Path(root, "test_cmp/in");
Path outDir = new Path(root, "test_cmp/out");
conf.set("mapreduce.partition.keycomparator.options", keySpec);
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setSortComparatorClass(KeyFieldBasedComparator.class);
job.setPartitionerClass(KeyFieldBasedPartitioner.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
// validate output
Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
/** Publishes compression related data statistics. Following statistics are
* published
* <ul>
* <li>Total compressed input data size</li>
* <li>Number of compressed input data files</li>
* <li>Compression Ratio</li>
* <li>Text data dictionary size</li>
* <li>Random text word size</li>
* </ul>
*/
static DataStatistics publishCompressedDataStatistics(Path inputDir,
Configuration conf, long uncompressedDataSize)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
CompressionCodecFactory compressionCodecs =
new CompressionCodecFactory(conf);
// iterate over compressed files and sum up the compressed file sizes
long compressedDataSize = 0;
int numCompressedFiles = 0;
// obtain input data file statuses
FileStatus[] outFileStatuses =
fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus status : outFileStatuses) {
// check if the input file is compressed
if (compressionCodecs != null) {
CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
if (codec != null) {
++numCompressedFiles;
compressedDataSize += status.getLen();
}
}
}
LOG.info("Gridmix is configured to use compressed input data.");
// publish the input data size
LOG.info("Total size of compressed input data : "
+ StringUtils.humanReadableInt(compressedDataSize));
LOG.info("Total number of compressed input data files : "
+ numCompressedFiles);
if (numCompressedFiles == 0) {
throw new RuntimeException("No compressed file found in the input"
+ " directory : " + inputDir.toString() + ". To enable compression"
+ " emulation, run Gridmix either with "
+ " an input directory containing compressed input file(s) or"
+ " use the -generate option to (re)generate it. If compression"
+ " emulation is not desired, disable it by setting '"
+ COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
}
// publish compression ratio only if its generated in this gridmix run
if (uncompressedDataSize > 0) {
// compute the compression ratio
double ratio = ((double)compressedDataSize) / uncompressedDataSize;
// publish the compression ratio
LOG.info("Input Data Compression Ratio : " + ratio);
}
return new DataStatistics(compressedDataSize, numCompressedFiles, true);
}
/**
* Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
*/
@Test
public void testRandomCompressedTextDataGenerator() throws Exception {
int wordSize = 10;
int listSize = 20;
long dataSize = 10*1024*1024;
Configuration conf = new Configuration();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
// configure the RandomTextDataGenerator to generate desired sized data
conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE,
listSize);
conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
wordSize);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
FileSystem lfs = FileSystem.getLocal(conf);
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
lfs.delete(tempDir, true);
runDataGenJob(conf, tempDir);
// validate the output data
FileStatus[] files =
lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
long size = 0;
long maxLineSize = 0;
for (FileStatus status : files) {
InputStream in =
CompressionEmulationUtil
.getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = reader.readLine();
if (line != null) {
long lineSize = line.getBytes().length;
if (lineSize > maxLineSize) {
maxLineSize = lineSize;
}
while (line != null) {
for (String word : line.split("\\s")) {
size += word.getBytes().length;
}
line = reader.readLine();
}
}
reader.close();
}
assertTrue(size >= dataSize);
assertTrue(size <= dataSize + maxLineSize);
}
/**
* Test if {@link RandomTextDataGenerator} can generate random text data
* with the desired compression ratio. This involves
* - using {@link CompressionEmulationUtil} to configure the MR job for
* generating the random text data with the desired compression ratio
* - running the MR job
* - test {@link RandomTextDataGenerator}'s output and match the output size
* (compressed) with the expected compression ratio.
*/
private void testCompressionRatioConfigure(float ratio)
throws Exception {
long dataSize = 10*1024*1024;
Configuration conf = new Configuration();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
if (ratio > 0) {
// set the compression ratio in the conf
CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
expectedRatio =
CompressionEmulationUtil.standardizeCompressionRatio(ratio);
}
// invoke the utility to map from ratio to word-size
CompressionEmulationUtil.setupDataGeneratorConfig(conf);
FileSystem lfs = FileSystem.getLocal(conf);
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir =
new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
lfs.delete(tempDir, true);
runDataGenJob(conf, tempDir);
// validate the output data
FileStatus[] files =
lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
long size = 0;
for (FileStatus status : files) {
size += status.getLen();
}
float compressionRatio = ((float)size)/dataSize;
float stdRatio =
CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
assertEquals(expectedRatio, stdRatio, 0.0D);
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
private void testComparator(String keySpec, int expect)
throws Exception {
String root = System.getProperty("test.build.data", "/tmp");
Path inDir = new Path(root, "test_cmp/in");
Path outDir = new Path(root, "test_cmp/out");
conf.set("mapreduce.partition.keycomparator.options", keySpec);
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setSortComparatorClass(KeyFieldBasedComparator.class);
job.setPartitionerClass(KeyFieldBasedPartitioner.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
// validate output
Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
/** Publishes compression related data statistics. Following statistics are
* published
* <ul>
* <li>Total compressed input data size</li>
* <li>Number of compressed input data files</li>
* <li>Compression Ratio</li>
* <li>Text data dictionary size</li>
* <li>Random text word size</li>
* </ul>
*/
static DataStatistics publishCompressedDataStatistics(Path inputDir,
Configuration conf, long uncompressedDataSize)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
CompressionCodecFactory compressionCodecs =
new CompressionCodecFactory(conf);
// iterate over compressed files and sum up the compressed file sizes
long compressedDataSize = 0;
int numCompressedFiles = 0;
// obtain input data file statuses
FileStatus[] outFileStatuses =
fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus status : outFileStatuses) {
// check if the input file is compressed
if (compressionCodecs != null) {
CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
if (codec != null) {
++numCompressedFiles;
compressedDataSize += status.getLen();
}
}
}
LOG.info("Gridmix is configured to use compressed input data.");
// publish the input data size
LOG.info("Total size of compressed input data : "
+ StringUtils.humanReadableInt(compressedDataSize));
LOG.info("Total number of compressed input data files : "
+ numCompressedFiles);
if (numCompressedFiles == 0) {
throw new RuntimeException("No compressed file found in the input"
+ " directory : " + inputDir.toString() + ". To enable compression"
+ " emulation, run Gridmix either with "
+ " an input directory containing compressed input file(s) or"
+ " use the -generate option to (re)generate it. If compression"
+ " emulation is not desired, disable it by setting '"
+ COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
}
// publish compression ratio only if its generated in this gridmix run
if (uncompressedDataSize > 0) {
// compute the compression ratio
double ratio = ((double)compressedDataSize) / uncompressedDataSize;
// publish the compression ratio
LOG.info("Input Data Compression Ratio : " + ratio);
}
return new DataStatistics(compressedDataSize, numCompressedFiles, true);
}
/**
* Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
*/
@Test
public void testRandomCompressedTextDataGenerator() throws Exception {
int wordSize = 10;
int listSize = 20;
long dataSize = 10*1024*1024;
Configuration conf = new Configuration();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
// configure the RandomTextDataGenerator to generate desired sized data
conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE,
listSize);
conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
wordSize);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
FileSystem lfs = FileSystem.getLocal(conf);
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
lfs.delete(tempDir, true);
runDataGenJob(conf, tempDir);
// validate the output data
FileStatus[] files =
lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
long size = 0;
long maxLineSize = 0;
for (FileStatus status : files) {
InputStream in =
CompressionEmulationUtil
.getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = reader.readLine();
if (line != null) {
long lineSize = line.getBytes().length;
if (lineSize > maxLineSize) {
maxLineSize = lineSize;
}
while (line != null) {
for (String word : line.split("\\s")) {
size += word.getBytes().length;
}
line = reader.readLine();
}
}
reader.close();
}
assertTrue(size >= dataSize);
assertTrue(size <= dataSize + maxLineSize);
}
/**
* Test if {@link RandomTextDataGenerator} can generate random text data
* with the desired compression ratio. This involves
* - using {@link CompressionEmulationUtil} to configure the MR job for
* generating the random text data with the desired compression ratio
* - running the MR job
* - test {@link RandomTextDataGenerator}'s output and match the output size
* (compressed) with the expected compression ratio.
*/
private void testCompressionRatioConfigure(float ratio)
throws Exception {
long dataSize = 10*1024*1024;
Configuration conf = new Configuration();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
if (ratio > 0) {
// set the compression ratio in the conf
CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
expectedRatio =
CompressionEmulationUtil.standardizeCompressionRatio(ratio);
}
// invoke the utility to map from ratio to word-size
CompressionEmulationUtil.setupDataGeneratorConfig(conf);
FileSystem lfs = FileSystem.getLocal(conf);
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir =
new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
lfs.delete(tempDir, true);
runDataGenJob(conf, tempDir);
// validate the output data
FileStatus[] files =
lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
long size = 0;
for (FileStatus status : files) {
size += status.getLen();
}
float compressionRatio = ((float)size)/dataSize;
float stdRatio =
CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
assertEquals(expectedRatio, stdRatio, 0.0D);
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
public void configure(String keySpec, int expect) throws Exception {
Path testdir = new Path("build/test/test.mapred.spill");
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = getFileSystem();
fs.delete(testdir, true);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(2);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
conf.setKeyFieldComparatorOptions(keySpec);
conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
conf.set("map.output.key.field.separator", " ");
conf.setMapperClass(InverseMapper.class);
conf.setReducerClass(IdentityReducer.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// set up input data in 2 files
Path inFile = new Path(inDir, "part0");
FileOutputStream fos = new FileOutputStream(inFile.toString());
fos.write((line1 + "\n").getBytes());
fos.write((line2 + "\n").getBytes());
fos.close();
JobClient jc = new JobClient(conf);
RunningJob r_job = jc.submitJob(conf);
while (!r_job.isComplete()) {
Thread.sleep(1000);
}
if (!r_job.isSuccessful()) {
fail("Oops! The job broke due to an unexpected error");
}
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}