下面列出了怎么用org.apache.hadoop.mapreduce.MapContext的API类实例代码及写法,或者点击链接到github查看源代码。
private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<Text, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<Text, Text, Text, Text> mcontext =
new MapContextImpl<Text, Text, Text, Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
reader.close();
return result;
}
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<LongWritable,Text,LongWritable,Text> mcontext =
new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
return result;
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<Text, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<Text, Text, Text, Text> mcontext =
new MapContextImpl<Text, Text, Text, Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
reader.close();
return result;
}
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<LongWritable,Text,LongWritable,Text> mcontext =
new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
return result;
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
/**
* Set the progress of the current task.
* *Note: Works only when using a Virtual Input Format
*
* @param value value of the progress must lie within [0.0, 1.0]
*/
public static void setProgress(float value)
{
if (PhaseContext.isIntialized())
{
final MapContext mapContext = PhaseContext.getMapContext();
try
{
final FloatWritable progress = (FloatWritable) mapContext.getCurrentKey();
progress.set(value);
mapContext.nextKeyValue();
}
catch (Exception e)
{
System.err.println("Unable to report progress in Load Cyclic. Exception: " + e);
e.printStackTrace();
}
}
}
private void writeMetrics(QueryMetric updatedQueryMetric, List<QueryMetric> storedQueryMetrics, Date lastUpdated, boolean delete) throws Exception {
LiveContextWriter contextWriter = null;
MapContext<Text,RawRecordContainer,Text,Mutation> context = null;
try {
contextWriter = new LiveContextWriter();
contextWriter.setup(conf, false);
TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1);
context = new MapContextImpl<>(conf, taskId, null, recordWriter, null, reporter, null);
for (QueryMetric storedQueryMetric : storedQueryMetrics) {
AbstractColumnBasedHandler<Key> handler = new ContentQueryMetricsHandler<>();
handler.setup(context);
Multimap<BulkIngestKey,Value> r = getEntries(handler, updatedQueryMetric, storedQueryMetric, lastUpdated, delete);
try {
if (r != null) {
contextWriter.write(r, context);
}
if (handler.getMetadata() != null) {
contextWriter.write(handler.getMetadata().getBulkMetadata(), context);
}
} finally {
contextWriter.commit(context);
}
}
} finally {
if (contextWriter != null && context != null) {
contextWriter.cleanup(context);
}
}
}
private void ingestTestData(Configuration conf, TestFileLoader loader) throws IOException, InterruptedException {
log.debug("------------- ingestTestData -------------");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
Path tmpPath = new Path(tmpDir.toURI());
Path seqFile = new Path(tmpPath, UUID.randomUUID().toString());
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.MAP, 0, 0);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
try (final RawLocalFileSystem rfs = createSequenceFile(conf, seqFile, loader)) {
InputSplit split = new FileSplit(seqFile, 0, rfs.pathToFile(seqFile).length(), null);
EventSequenceFileRecordReader<LongWritable> rr = new EventSequenceFileRecordReader<>();
rr.initialize(split, context);
Path ocPath = new Path(tmpPath, "oc");
OutputCommitter oc = new FileOutputCommitter(ocPath, context);
rfs.deleteOnExit(ocPath);
StandaloneStatusReporter sr = new StandaloneStatusReporter();
EventMapper<LongWritable,RawRecordContainer,Text,Mutation> mapper = new EventMapper<>();
MapContext<LongWritable,RawRecordContainer,Text,Mutation> mapContext = new MapContextImpl<>(conf, id, rr, this.recordWriter, oc, sr, split);
Mapper<LongWritable,RawRecordContainer,Text,Mutation>.Context con = new WrappedMapper<LongWritable,RawRecordContainer,Text,Mutation>()
.getMapContext(mapContext);
mapper.run(con);
mapper.cleanup(con);
} finally {
this.recordWriter.close(context);
}
}
private int countRecords(int numSplits)
throws IOException, InterruptedException {
InputFormat<Text, BytesWritable> format =
new SequenceFileInputFilter<Text, BytesWritable>();
if (numSplits == 0) {
numSplits =
random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
}
FileInputFormat.setMaxInputSplitSize(job,
fs.getFileStatus(inFile).getLen() / numSplits);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
// check each split
int count = 0;
for (InputSplit split : format.getSplits(job)) {
RecordReader<Text, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
try {
while (reader.nextKeyValue()) {
LOG.info("Accept record " + reader.getCurrentKey().toString());
count++;
}
} finally {
reader.close();
}
}
return count;
}
/**
* Test with no record length set.
*/
@Test (timeout=5000)
public void testNoRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Create the job and do not set fixed record length
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for not setting record length:", exceptionThrown);
}
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 0);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(
job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
/**
* Test with record length set to a negative value
*/
@Test (timeout=5000)
public void testNegativeRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), -10);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for negative record length:", exceptionThrown);
}
private static List<String> readSplit(FixedLengthInputFormat format,
InputSplit split,
Job job) throws Exception {
List<String> result = new ArrayList<String>();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
LongWritable key;
BytesWritable value;
try {
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
key = reader.getCurrentKey();
value = reader.getCurrentValue();
result.add(new String(value.getBytes(), 0, value.getLength()));
}
} finally {
reader.close();
}
return result;
}
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}
@Override
public InputSplit getInputSplit() {
if (base instanceof MapContext) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mc =
(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) base;
return mc.getInputSplit();
} else {
return null;
}
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
/**
* Validate setupGenerateDistCacheData by validating <li>permissions of the
* distributed cache directories and <li>content of the generated sequence
* file. This includes validation of dist cache file paths and their file
* sizes.
*/
private void validateSetupGenDC(Configuration jobConf, long[] sortedFileSizes)
throws IOException, InterruptedException {
// build things needed for validation
long sumOfFileSizes = 0;
for (int i = 0; i < sortedFileSizes.length; i++) {
sumOfFileSizes += sortedFileSizes[i];
}
FileSystem fs = FileSystem.get(jobConf);
assertEquals("Number of distributed cache files to be generated is wrong.",
sortedFileSizes.length,
jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
assertEquals("Total size of dist cache files to be generated is wrong.",
sumOfFileSizes,
jobConf.getLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
Path filesListFile = new Path(
jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
FileStatus stat = fs.getFileStatus(filesListFile);
assertEquals("Wrong permissions of dist Cache files list file "
+ filesListFile, new FsPermission((short) 0644), stat.getPermission());
InputSplit split = new FileSplit(filesListFile, 0, stat.getLen(),
(String[]) null);
TaskAttemptContext taskContext = MapReduceTestUtil
.createDummyMapTaskAttemptContext(jobConf);
RecordReader<LongWritable, BytesWritable> reader = new GenerateDistCacheData.GenDCDataFormat()
.createRecordReader(split, taskContext);
MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable> mapContext = new MapContextImpl<LongWritable, BytesWritable, NullWritable, BytesWritable>(
jobConf, taskContext.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mapContext);
// start validating setupGenerateDistCacheData
doValidateSetupGenDC(reader, fs, sortedFileSizes);
}
private int countRecords(int numSplits)
throws IOException, InterruptedException {
InputFormat<Text, BytesWritable> format =
new SequenceFileInputFilter<Text, BytesWritable>();
if (numSplits == 0) {
numSplits =
random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
}
FileInputFormat.setMaxInputSplitSize(job,
fs.getFileStatus(inFile).getLen() / numSplits);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
// check each split
int count = 0;
for (InputSplit split : format.getSplits(job)) {
RecordReader<Text, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
try {
while (reader.nextKeyValue()) {
LOG.info("Accept record " + reader.getCurrentKey().toString());
count++;
}
} finally {
reader.close();
}
}
return count;
}
/**
* Test with no record length set.
*/
@Test (timeout=5000)
public void testNoRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Create the job and do not set fixed record length
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for not setting record length:", exceptionThrown);
}
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 0);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(
job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
/**
* Test with record length set to a negative value
*/
@Test (timeout=5000)
public void testNegativeRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), -10);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for negative record length:", exceptionThrown);
}
private static List<String> readSplit(FixedLengthInputFormat format,
InputSplit split,
Job job) throws Exception {
List<String> result = new ArrayList<String>();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
LongWritable key;
BytesWritable value;
try {
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
key = reader.getCurrentKey();
value = reader.getCurrentValue();
result.add(new String(value.getBytes(), 0, value.getLength()));
}
} finally {
reader.close();
}
return result;
}
MapRunner(Context context) throws IOException, InterruptedException {
mapper = ReflectionUtils.newInstance(mapClass,
context.getConfiguration());
MapContext<K1, V1, K2, V2> mapContext =
new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(),
outer.getTaskAttemptID(),
reader,
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
reader.initialize(context.getInputSplit(), context);
}
@Override
public InputSplit getInputSplit() {
if (base instanceof MapContext) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mc =
(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) base;
return mc.getInputSplit();
} else {
return null;
}
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}