下面列出了怎么用org.apache.hadoop.io.serializer.JavaSerializationComparator的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to
* SequenceFiles. by virtue other SequenceFileOutputFormat is not
* coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
JobConf conf = new JobConf(TestJavaSerialization.class);
conf.setJobName("JavaSerialization");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
// test we can write to sequence files
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
}
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to
* SequenceFiles. by virtue other SequenceFileOutputFormat is not
* coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
JobConf conf = new JobConf(TestJavaSerialization.class);
conf.setJobName("JavaSerialization");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
// test we can write to sequence files
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
}
public void testMapReduceJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("a\t1", reader.readLine());
assertEquals("b\t1", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
* SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
}
public void testMapReduceJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
assertEquals("a\t1", reader.readLine());
assertEquals("b\t1", reader.readLine());
assertNull(reader.readLine());
reader.close();
}
/**
* HADOOP-4466:
* This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
* SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
*
*/
public void testWriteToSequencefile() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(),
"text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("b a\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("JavaSerialization");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
assertEquals(1, outputFiles.length);
}
public void testMapReduceJob() throws Exception {
JobConf conf = new JobConf(TestJavaSerialization.class);
conf.setJobName("JavaSerialization");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
String inputFileContents =
FileUtils.readFileToString(new File(INPUT_FILE.toUri().getPath()));
assertTrue("Input file contents not as expected; contents are '"
+ inputFileContents + "', expected \"b a\n\" ",
inputFileContents.equals("b a\n"));
JobClient.runJob(conf);
Path[] outputFiles =
FileUtil.stat2Paths(fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = fs.open(outputFiles[0]);
String reduceOutput = org.apache.commons.io.IOUtils.toString(is);
String[] lines = reduceOutput.split(System.getProperty("line.separator"));
assertEquals("Unexpected output; received output '" + reduceOutput + "'",
"a\t1", lines[0]);
assertEquals("Unexpected output; received output '" + reduceOutput + "'",
"b\t1", lines[1]);
assertEquals("Reduce output has extra lines; output is '" + reduceOutput
+ "'", 2, lines.length);
is.close();
}
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
Path inDir = getDir(IN_DIR);
Path outDir = getDir(OUT_DIR);
JobConf conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
fs.delete(inDir, true);
fs.delete(outDir, true);
file = fs.create(new Path(inDir, "part-1"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
conf.setJobName("mo");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(Long.class);
conf.setMapOutputValueClass(String.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setOutputKeyClass(Long.class);
conf.setOutputValueClass(String.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
Long.class, String.class);
MultipleOutputs.setCountersEnabled(conf, withCounters);
conf.setMapperClass(MOJavaSerDeMap.class);
conf.setReducerClass(MOJavaSerDeReduce.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
JobClient jc = new JobClient(conf);
RunningJob job = jc.submitJob(conf);
while (!job.isComplete()) {
Thread.sleep(100);
}
// assert number of named output part files
int namedOutputCount = 0;
FileStatus[] statuses = fs.listStatus(outDir);
for (FileStatus status : statuses) {
if (status.getPath().getName().equals("text-m-00000") ||
status.getPath().getName().equals("text-r-00000")) {
namedOutputCount++;
}
}
assertEquals(2, namedOutputCount);
// assert TextOutputFormat files correctness
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(
new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
int count = 0;
String line = reader.readLine();
while (line != null) {
assertTrue(line.endsWith("text"));
line = reader.readLine();
count++;
}
reader.close();
assertFalse(count == 0);
Counters.Group counters =
job.getCounters().getGroup(MultipleOutputs.class.getName());
if (!withCounters) {
assertEquals(0, counters.size());
}
else {
assertEquals(1, counters.size());
assertEquals(2, counters.getCounter("text"));
}
}
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
String input = "a\nb\nc\nd\ne\nc\nd\ne";
Configuration conf = createJobConf();
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
Job job = MapReduceTestUtil.createJob(conf, IN_DIR, OUT_DIR, 2, 1, input);
job.setJobName("mo");
MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
Long.class, String.class);
MultipleOutputs.setCountersEnabled(job, withCounters);
job.setSortComparatorClass(JavaSerializationComparator.class);
job.setMapOutputKeyClass(Long.class);
job.setMapOutputValueClass(String.class);
job.setOutputKeyClass(Long.class);
job.setOutputValueClass(String.class);
job.setMapperClass(MOJavaSerDeMap.class);
job.setReducerClass(MOJavaSerDeReduce.class);
job.waitForCompletion(true);
// assert number of named output part files
int namedOutputCount = 0;
int valueBasedOutputCount = 0;
FileSystem fs = OUT_DIR.getFileSystem(conf);
FileStatus[] statuses = fs.listStatus(OUT_DIR);
for (FileStatus status : statuses) {
String fileName = status.getPath().getName();
if (fileName.equals("text-m-00000") ||
fileName.equals("text-m-00001") ||
fileName.equals("text-r-00000")) {
namedOutputCount++;
} else if (fileName.equals("a-r-00000") ||
fileName.equals("b-r-00000") ||
fileName.equals("c-r-00000") ||
fileName.equals("d-r-00000") ||
fileName.equals("e-r-00000")) {
valueBasedOutputCount++;
}
}
assertEquals(3, namedOutputCount);
assertEquals(5, valueBasedOutputCount);
// assert TextOutputFormat files correctness
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(
new Path(FileOutputFormat.getOutputPath(job), "text-r-00000"))));
int count = 0;
String line = reader.readLine();
while (line != null) {
assertTrue(line.endsWith(TEXT));
line = reader.readLine();
count++;
}
reader.close();
assertFalse(count == 0);
if (withCounters) {
CounterGroup counters =
job.getCounters().getGroup(MultipleOutputs.class.getName());
assertEquals(6, counters.size());
assertEquals(4, counters.findCounter(TEXT).getValue());
assertEquals(2, counters.findCounter("a").getValue());
assertEquals(2, counters.findCounter("b").getValue());
assertEquals(4, counters.findCounter("c").getValue());
assertEquals(4, counters.findCounter("d").getValue());
assertEquals(4, counters.findCounter("e").getValue());
}
}
public void testMapReduceJob() throws Exception {
JobConf conf = new JobConf(TestJavaSerialization.class);
conf.setJobName("JavaSerialization");
FileSystem fs = FileSystem.get(conf);
cleanAndCreateInput(fs);
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(String.class);
conf.setOutputValueClass(Long.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(SumReducer.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
FileInputFormat.setInputPaths(conf, INPUT_DIR);
FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
String inputFileContents =
FileUtils.readFileToString(new File(INPUT_FILE.toUri().getPath()));
assertTrue("Input file contents not as expected; contents are '"
+ inputFileContents + "', expected \"b a\n\" ",
inputFileContents.equals("b a\n"));
JobClient.runJob(conf);
Path[] outputFiles =
FileUtil.stat2Paths(fs.listStatus(OUTPUT_DIR,
new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = fs.open(outputFiles[0]);
String reduceOutput = org.apache.commons.io.IOUtils.toString(is);
String[] lines = reduceOutput.split(System.getProperty("line.separator"));
assertEquals("Unexpected output; received output '" + reduceOutput + "'",
"a\t1", lines[0]);
assertEquals("Unexpected output; received output '" + reduceOutput + "'",
"b\t1", lines[1]);
assertEquals("Reduce output has extra lines; output is '" + reduceOutput
+ "'", 2, lines.length);
is.close();
}
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
Path inDir = getDir(IN_DIR);
Path outDir = getDir(OUT_DIR);
JobConf conf = createJobConf();
FileSystem fs = FileSystem.get(conf);
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
fs.delete(inDir, true);
fs.delete(outDir, true);
file = fs.create(new Path(inDir, "part-1"));
file.writeBytes("a\nb\n\nc\nd\ne");
file.close();
conf.setJobName("mo");
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(Long.class);
conf.setMapOutputValueClass(String.class);
conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
conf.setOutputKeyClass(Long.class);
conf.setOutputValueClass(String.class);
conf.setOutputFormat(TextOutputFormat.class);
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
Long.class, String.class);
MultipleOutputs.setCountersEnabled(conf, withCounters);
conf.setMapperClass(MOJavaSerDeMap.class);
conf.setReducerClass(MOJavaSerDeReduce.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
JobClient jc = new JobClient(conf);
RunningJob job = jc.submitJob(conf);
while (!job.isComplete()) {
Thread.sleep(100);
}
// assert number of named output part files
int namedOutputCount = 0;
FileStatus[] statuses = fs.listStatus(outDir);
for (FileStatus status : statuses) {
if (status.getPath().getName().equals("text-m-00000") ||
status.getPath().getName().equals("text-r-00000")) {
namedOutputCount++;
}
}
assertEquals(2, namedOutputCount);
// assert TextOutputFormat files correctness
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(
new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
int count = 0;
String line = reader.readLine();
while (line != null) {
assertTrue(line.endsWith("text"));
line = reader.readLine();
count++;
}
reader.close();
assertFalse(count == 0);
Counters.Group counters =
job.getCounters().getGroup(MultipleOutputs.class.getName());
if (!withCounters) {
assertEquals(0, counters.size());
}
else {
assertEquals(1, counters.size());
assertEquals(2, counters.getCounter("text"));
}
}
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
String input = "a\nb\nc\nd\ne\nc\nd\ne";
Configuration conf = createJobConf();
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization," +
"org.apache.hadoop.io.serializer.WritableSerialization");
Job job = MapReduceTestUtil.createJob(conf, IN_DIR, OUT_DIR, 2, 1, input);
job.setJobName("mo");
MultipleOutputs.addNamedOutput(job, TEXT, TextOutputFormat.class,
Long.class, String.class);
MultipleOutputs.setCountersEnabled(job, withCounters);
job.setSortComparatorClass(JavaSerializationComparator.class);
job.setMapOutputKeyClass(Long.class);
job.setMapOutputValueClass(String.class);
job.setOutputKeyClass(Long.class);
job.setOutputValueClass(String.class);
job.setMapperClass(MOJavaSerDeMap.class);
job.setReducerClass(MOJavaSerDeReduce.class);
job.waitForCompletion(true);
// assert number of named output part files
int namedOutputCount = 0;
int valueBasedOutputCount = 0;
FileSystem fs = OUT_DIR.getFileSystem(conf);
FileStatus[] statuses = fs.listStatus(OUT_DIR);
for (FileStatus status : statuses) {
String fileName = status.getPath().getName();
if (fileName.equals("text-m-00000") ||
fileName.equals("text-m-00001") ||
fileName.equals("text-r-00000")) {
namedOutputCount++;
} else if (fileName.equals("a-r-00000") ||
fileName.equals("b-r-00000") ||
fileName.equals("c-r-00000") ||
fileName.equals("d-r-00000") ||
fileName.equals("e-r-00000")) {
valueBasedOutputCount++;
}
}
assertEquals(3, namedOutputCount);
assertEquals(5, valueBasedOutputCount);
// assert TextOutputFormat files correctness
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(
new Path(FileOutputFormat.getOutputPath(job), "text-r-00000"))));
int count = 0;
String line = reader.readLine();
while (line != null) {
assertTrue(line.endsWith(TEXT));
line = reader.readLine();
count++;
}
reader.close();
assertFalse(count == 0);
if (withCounters) {
CounterGroup counters =
job.getCounters().getGroup(MultipleOutputs.class.getName());
assertEquals(6, counters.size());
assertEquals(4, counters.findCounter(TEXT).getValue());
assertEquals(2, counters.findCounter("a").getValue());
assertEquals(2, counters.findCounter("b").getValue());
assertEquals(4, counters.findCounter("c").getValue());
assertEquals(4, counters.findCounter("d").getValue());
assertEquals(4, counters.findCounter("e").getValue());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSortSimple() throws Exception {
// Generate test data.
Job job = Job.getInstance();
job.setInputFormatClass(InFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(Mapper.class);
job.setNumReduceTasks(0);
setupFileSystems(job.getConfiguration());
FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
X.printerrln("Data generation started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Data generation complete.");
// Run main map-reduce job.
job = Job.getInstance();
setupFileSystems(job.getConfiguration());
job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
"," + WritableSerialization.class.getName());
FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
job.setSortComparatorClass(JavaSerializationComparator.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(UUID.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
X.printerrln("Job started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Job complete.");
// Check result.
Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
for (FileStatus file : fs.listStatus(outDir)) {
X.printerrln("__ file: " + file);
if (file.getLen() == 0)
continue;
FSDataInputStream in = fs.open(file.getPath());
Scanner sc = new Scanner(in);
UUID prev = null;
while (sc.hasNextLine()) {
UUID next = UUID.fromString(sc.nextLine());
// X.printerrln("___ check: " + next);
if (prev != null)
assertTrue(prev.compareTo(next) < 0);
prev = next;
}
}
}