下面列出了怎么用org.apache.hadoop.mapreduce.task.MapContextImpl的API类实例代码及写法,或者点击链接到github查看源代码。
private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<Text, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<Text, Text, Text, Text> mcontext =
new MapContextImpl<Text, Text, Text, Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
reader.close();
return result;
}
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<LongWritable,Text,LongWritable,Text> mcontext =
new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
return result;
}
private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<Text, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<Text, Text, Text, Text> mcontext =
new MapContextImpl<Text, Text, Text, Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
reader.close();
return result;
}
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<LongWritable,Text,LongWritable,Text> mcontext =
new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
return result;
}
@Test(expected = RuntimeException.class)
public void testProblemGettingLocalCacheFiles() throws IOException, URISyntaxException {
final URL url = createUrl("trimmed_splits.txt");
MultiTableRangePartitioner.setContext(new MapContextImpl<Key,Value,Text,Mutation>(configuration, new TaskAttemptID(), null, null, null, null, null) {
@Override
public org.apache.hadoop.fs.Path[] getLocalCacheFiles() throws IOException {
throw new IOException("Local cache files failure");
}
});
getPartition();
}
private void mockContextForLocalCacheFile(final URL url) {
MultiTableRangePartitioner.setContext(new MapContextImpl<Key,Value,Text,Mutation>(configuration, new TaskAttemptID(), null, null, null, null, null) {
@Override
public Path[] getLocalCacheFiles() throws IOException {
return new Path[] {new Path(url.getPath())};
}
});
}
@Test(expected = RuntimeException.class)
public void testProblemGettingLocalCacheFiles() throws IOException, URISyntaxException {
final URL url = createUrl("full_splits.txt");
MultiTableRangePartitioner.setContext(new MapContextImpl<Key,Value,Text,Mutation>(configuration, new TaskAttemptID(), null, null, null, null, null) {
@Override
public org.apache.hadoop.fs.Path[] getLocalCacheFiles() throws IOException {
throw new IOException("Local cache files failure");
}
});
getPartition("23432");
}
private void mockContextForLocalCacheFile(final URL url) {
MultiTableRangePartitioner.setContext(new MapContextImpl<Key,Value,Text,Mutation>(configuration, new TaskAttemptID(), null, null, null, null, null) {
@Override
public Path[] getLocalCacheFiles() throws IOException {
return new Path[] {new Path(url.getPath())};
}
});
}
private void writeMetrics(QueryMetric updatedQueryMetric, List<QueryMetric> storedQueryMetrics, Date lastUpdated, boolean delete) throws Exception {
LiveContextWriter contextWriter = null;
MapContext<Text,RawRecordContainer,Text,Mutation> context = null;
try {
contextWriter = new LiveContextWriter();
contextWriter.setup(conf, false);
TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1);
context = new MapContextImpl<>(conf, taskId, null, recordWriter, null, reporter, null);
for (QueryMetric storedQueryMetric : storedQueryMetrics) {
AbstractColumnBasedHandler<Key> handler = new ContentQueryMetricsHandler<>();
handler.setup(context);
Multimap<BulkIngestKey,Value> r = getEntries(handler, updatedQueryMetric, storedQueryMetric, lastUpdated, delete);
try {
if (r != null) {
contextWriter.write(r, context);
}
if (handler.getMetadata() != null) {
contextWriter.write(handler.getMetadata().getBulkMetadata(), context);
}
} finally {
contextWriter.commit(context);
}
}
} finally {
if (contextWriter != null && context != null) {
contextWriter.cleanup(context);
}
}
}
private void ingestTestData(Configuration conf, TestFileLoader loader) throws IOException, InterruptedException {
log.debug("------------- ingestTestData -------------");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
Path tmpPath = new Path(tmpDir.toURI());
Path seqFile = new Path(tmpPath, UUID.randomUUID().toString());
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.MAP, 0, 0);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
try (final RawLocalFileSystem rfs = createSequenceFile(conf, seqFile, loader)) {
InputSplit split = new FileSplit(seqFile, 0, rfs.pathToFile(seqFile).length(), null);
EventSequenceFileRecordReader<LongWritable> rr = new EventSequenceFileRecordReader<>();
rr.initialize(split, context);
Path ocPath = new Path(tmpPath, "oc");
OutputCommitter oc = new FileOutputCommitter(ocPath, context);
rfs.deleteOnExit(ocPath);
StandaloneStatusReporter sr = new StandaloneStatusReporter();
EventMapper<LongWritable,RawRecordContainer,Text,Mutation> mapper = new EventMapper<>();
MapContext<LongWritable,RawRecordContainer,Text,Mutation> mapContext = new MapContextImpl<>(conf, id, rr, this.recordWriter, oc, sr, split);
Mapper<LongWritable,RawRecordContainer,Text,Mutation>.Context con = new WrappedMapper<LongWritable,RawRecordContainer,Text,Mutation>()
.getMapContext(mapContext);
mapper.run(con);
mapper.cleanup(con);
} finally {
this.recordWriter.close(context);
}
}
public StubContext(Configuration conf, RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper = new WrappedMapper<>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl = new MapContextImpl<>(conf,
getTaskAttemptID(taskId), reader, writer, null, reporter, null);
this.reader = reader;
mapperContext = wrappedMapper.getMapContext(contextImpl);
}
void checkFormat(Job job, int expectedN, int lastN)
throws IOException, InterruptedException {
NLineInputFormat format = new NLineInputFormat();
List<InputSplit> splits = format.getSplits(job);
int count = 0;
for (int i = 0; i < splits.size(); i++) {
assertEquals("There are no split locations", 0,
splits.get(i).getLocations().length);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, Text> reader = format.createRecordReader(
splits.get(i), context);
Class<?> clazz = reader.getClass();
assertEquals("reader class is LineRecordReader.",
LineRecordReader.class, clazz);
MapContext<LongWritable, Text, LongWritable, Text> mcontext =
new MapContextImpl<LongWritable, Text, LongWritable, Text>(
job.getConfiguration(), context.getTaskAttemptID(), reader, null,
null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
reader.initialize(splits.get(i), mcontext);
try {
count = 0;
while (reader.nextKeyValue()) {
count++;
}
} finally {
reader.close();
}
if ( i == splits.size() - 1) {
assertEquals("number of lines in split(" + i + ") is wrong" ,
lastN, count);
} else {
assertEquals("number of lines in split(" + i + ") is wrong" ,
expectedN, count);
}
}
}
private int countRecords(int numSplits)
throws IOException, InterruptedException {
InputFormat<Text, BytesWritable> format =
new SequenceFileInputFilter<Text, BytesWritable>();
if (numSplits == 0) {
numSplits =
random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
}
FileInputFormat.setMaxInputSplitSize(job,
fs.getFileStatus(inFile).getLen() / numSplits);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
// check each split
int count = 0;
for (InputSplit split : format.getSplits(job)) {
RecordReader<Text, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
try {
while (reader.nextKeyValue()) {
LOG.info("Accept record " + reader.getCurrentKey().toString());
count++;
}
} finally {
reader.close();
}
}
return count;
}
/**
* Test with no record length set.
*/
@Test (timeout=5000)
public void testNoRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Create the job and do not set fixed record length
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for not setting record length:", exceptionThrown);
}
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 0);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(
job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
/**
* Test with record length set to a negative value
*/
@Test (timeout=5000)
public void testNegativeRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), -10);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for negative record length:", exceptionThrown);
}
private static List<String> readSplit(FixedLengthInputFormat format,
InputSplit split,
Job job) throws Exception {
List<String> result = new ArrayList<String>();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
LongWritable key;
BytesWritable value;
try {
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
key = reader.getCurrentKey();
value = reader.getCurrentValue();
result.add(new String(value.getBytes(), 0, value.getLength()));
}
} finally {
reader.close();
}
return result;
}
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}
@Test
public void testCloneMapContext() throws Exception {
TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
TaskAttemptID taskAttemptid = new TaskAttemptID(taskId, 0);
MapContext<IntWritable, IntWritable, IntWritable, IntWritable> mapContext =
new MapContextImpl<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, taskAttemptid, null, null, null, null, null);
Mapper<IntWritable, IntWritable, IntWritable, IntWritable>.Context mapperContext =
new WrappedMapper<IntWritable, IntWritable, IntWritable, IntWritable>().getMapContext(
mapContext);
ContextFactory.cloneMapContext(mapperContext, conf, null, null);
}
public StubContext(Configuration conf,
RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
= new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
= new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
getTaskAttemptID(taskId), reader, writer,
null, reporter, null);
this.reader = reader;
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
/**
* Validate setupGenerateDistCacheData by validating <li>permissions of the
* distributed cache directories and <li>content of the generated sequence
* file. This includes validation of dist cache file paths and their file
* sizes.
*/
private void validateSetupGenDC(Configuration jobConf, long[] sortedFileSizes)
throws IOException, InterruptedException {
// build things needed for validation
long sumOfFileSizes = 0;
for (int i = 0; i < sortedFileSizes.length; i++) {
sumOfFileSizes += sortedFileSizes[i];
}
FileSystem fs = FileSystem.get(jobConf);
assertEquals("Number of distributed cache files to be generated is wrong.",
sortedFileSizes.length,
jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
assertEquals("Total size of dist cache files to be generated is wrong.",
sumOfFileSizes,
jobConf.getLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
Path filesListFile = new Path(
jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
FileStatus stat = fs.getFileStatus(filesListFile);
assertEquals("Wrong permissions of dist Cache files list file "
+ filesListFile, new FsPermission((short) 0644), stat.getPermission());
InputSplit split = new FileSplit(filesListFile, 0, stat.getLen(),
(String[]) null);
TaskAttemptContext taskContext = MapReduceTestUtil
.createDummyMapTaskAttemptContext(jobConf);
RecordReader<LongWritable, BytesWritable> reader = new GenerateDistCacheData.GenDCDataFormat()
.createRecordReader(split, taskContext);
MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable> mapContext = new MapContextImpl<LongWritable, BytesWritable, NullWritable, BytesWritable>(
jobConf, taskContext.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mapContext);
// start validating setupGenerateDistCacheData
doValidateSetupGenDC(reader, fs, sortedFileSizes);
}
void checkFormat(Job job, int expectedN, int lastN)
throws IOException, InterruptedException {
NLineInputFormat format = new NLineInputFormat();
List<InputSplit> splits = format.getSplits(job);
int count = 0;
for (int i = 0; i < splits.size(); i++) {
assertEquals("There are no split locations", 0,
splits.get(i).getLocations().length);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, Text> reader = format.createRecordReader(
splits.get(i), context);
Class<?> clazz = reader.getClass();
assertEquals("reader class is LineRecordReader.",
LineRecordReader.class, clazz);
MapContext<LongWritable, Text, LongWritable, Text> mcontext =
new MapContextImpl<LongWritable, Text, LongWritable, Text>(
job.getConfiguration(), context.getTaskAttemptID(), reader, null,
null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
reader.initialize(splits.get(i), mcontext);
try {
count = 0;
while (reader.nextKeyValue()) {
count++;
}
} finally {
reader.close();
}
if ( i == splits.size() - 1) {
assertEquals("number of lines in split(" + i + ") is wrong" ,
lastN, count);
} else {
assertEquals("number of lines in split(" + i + ") is wrong" ,
expectedN, count);
}
}
}
private int countRecords(int numSplits)
throws IOException, InterruptedException {
InputFormat<Text, BytesWritable> format =
new SequenceFileInputFilter<Text, BytesWritable>();
if (numSplits == 0) {
numSplits =
random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
}
FileInputFormat.setMaxInputSplitSize(job,
fs.getFileStatus(inFile).getLen() / numSplits);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
// check each split
int count = 0;
for (InputSplit split : format.getSplits(job)) {
RecordReader<Text, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
try {
while (reader.nextKeyValue()) {
LOG.info("Accept record " + reader.getCurrentKey().toString());
count++;
}
} finally {
reader.close();
}
}
return count;
}
/**
* Test with no record length set.
*/
@Test (timeout=5000)
public void testNoRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Create the job and do not set fixed record length
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for not setting record length:", exceptionThrown);
}
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 0);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(
job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
/**
* Test with record length set to a negative value
*/
@Test (timeout=5000)
public void testNegativeRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), -10);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for negative record length:", exceptionThrown);
}
private static List<String> readSplit(FixedLengthInputFormat format,
InputSplit split,
Job job) throws Exception {
List<String> result = new ArrayList<String>();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
LongWritable key;
BytesWritable value;
try {
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
key = reader.getCurrentKey();
value = reader.getCurrentValue();
result.add(new String(value.getBytes(), 0, value.getLength()));
}
} finally {
reader.close();
}
return result;
}
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}