下面列出了怎么用org.apache.hadoop.mapred.lib.IdentityMapper的API类实例代码及写法,或者点击链接到github查看源代码。
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) {
try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) {
break;
}
}
return job;
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapreduce.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
public void dedup(String solrUrl, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start));
LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
JobConf job = new NutchJob(getConf());
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setBoolean("noCommit", noCommit);
job.setInputFormat(SolrInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SolrRecord.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(SolrDeleteDuplicates.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath,
int numMaps, String jobName) throws IOException {
jobConf.setJobName(jobName);
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setNumReduceTasks(0);
jobConf.setJar("build/test/testjar/testjob.jar");
return JobClient.runJob(jobConf);
}
public void testEmptyJoin() throws Exception {
JobConf job = new JobConf();
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setOutputKeyClass(IncomparableKey.class);
job.setOutputValueClass(NullWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
JobClient.runJob(job);
}
private void configureJob(JobConf jc, String jobName, int maps, int reds,
Path outDir) {
jc.setJobName(jobName);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
jc.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(jc, inDir);
FileOutputFormat.setOutputPath(jc, outDir);
jc.setMapperClass(IdentityMapper.class);
jc.setReducerClass(IdentityReducer.class);
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reds);
}
public void testComplexName() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("[name][some other value that gets truncated internally that this test attempts to aggravate]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
public void testComplexNameWithRegex() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("name \\Evalue]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
public static void runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException, InterruptedException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
//conf.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
conf.setReducerClass(IdentityReducer.class);
boolean success = runJob(conf, inDir, outDir, 1 , 1);
Assert.assertTrue("Job expected to succeed failed", success);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
JobClient.runJob(job);
}
private void configureJob(JobConf jc, String jobName, int maps, int reds,
Path outDir) {
jc.setJobName(jobName);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
jc.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(jc, inDir);
FileOutputFormat.setOutputPath(jc, outDir);
jc.setMapperClass(IdentityMapper.class);
jc.setReducerClass(IdentityReducer.class);
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reds);
}
public void testComplexName() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("[name][some other value that gets truncated internally that this test attempts to aggravate]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
public void testComplexNameWithRegex() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("name \\Evalue]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
public static void runJobSucceed(JobConf conf, Path inDir, Path outDir)
throws IOException, InterruptedException {
conf.setJobName("test-job-succeed");
conf.setMapperClass(IdentityMapper.class);
//conf.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
conf.setReducerClass(IdentityReducer.class);
boolean success = runJob(conf, inDir, outDir, 1 , 1);
Assert.assertTrue("Job expected to succeed failed", success);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
@Before
public void setUp() {
mapper1 = new IdentityMapper<Text, Text>();
reducer1 = new IdentityReducer<Text, Text>();
mapper2 = new IdentityMapper<Text, Text>();
reducer2 = new IdentityReducer<Text, Text>();
driver = new PipelineMapReduceDriver<Text, Text, Text, Text>();
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper1, reducer1));
driver.addMapReduce(new Pair<Mapper, Reducer>(mapper2, reducer2));
}
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("javaSort.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
clearDir(outdir);
try {
JobConf jobConf = new JobConf();
jobConf.setJarByClass(Sort.class);
jobConf.setJobName("GridmixJavaSorter." + size);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setNumReduceTasks(numReducers);
jobConf.setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
jobConf.setCompressMapOutput(mapoutputCompressed);
jobConf.setBoolean("mapred.output.compress", outputCompressed);
FileInputFormat.addInputPaths(jobConf, indir);
FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
Job job = new Job(jobConf);
gridmix.addJob(job);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public RunningJob launchJob(JobConf conf)
throws IOException {
// set up the input file system and write input text.
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
// write input into input file
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
// configure the mapred Job
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
"/tmp")).toString().replace(' ', '+');
conf.set("test.build.data", TEST_ROOT_DIR);
// return the RunningJob handle.
return new JobClient(conf).submitJob(conf);
}
public void testJobDirctoryCleanup() throws Exception {
try {
conf = new JobConf();
FileSystem fileSys = FileSystem.get(conf);
fileSys.delete(new Path(TEST_ROOT_DIR), true);
cluster = new MiniMRCluster(1, "file:///", 1, null, new String[] {"host1"}, conf);
JobConf jc = cluster.createJobConf();
jc.setJobName("TestJob");
Path inDir = new Path(TEST_ROOT_DIR, "test-input");
Path outDir = new Path(TEST_ROOT_DIR, "test-output");
String input = "Test\n";
DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
file.writeBytes(input);
file.close();
FileInputFormat.setInputPaths(jc, inDir);
FileOutputFormat.setOutputPath(jc, outDir);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
jc.setOutputValueClass(Text.class);
jc.setMapperClass(IdentityMapper.class);
jc.setReducerClass(IdentityReducer.class);
jc.setNumMapTasks(1);
jc.setNumReduceTasks(1);
JobClient jobClient = new JobClient(jc);
RunningJob job = jobClient.submitJob(jc);
JobID jobId = job.getID();
job.waitForCompletion();
cluster.getTaskTrackerRunner(0).getTaskTracker();
String subdir = TaskTracker.getLocalJobDir(jobId.toString());
File dir = new File(cluster.getTaskTrackerLocalDir(0) + "/" + subdir);
assertEquals(null, dir.list());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
public void testComplexName() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("[name][some other value that gets truncated internally that this test attempts to aggravate]");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("0\tb a", reader.readLine());
assertNull(reader.readLine());
reader.close();
}