下面列出了怎么用org.apache.hadoop.mapred.Task.TaskReporter的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testCustomCollect() throws Throwable {
//mock creation
TaskReporter mockTaskReporter = mock(TaskReporter.class);
@SuppressWarnings("unchecked")
Writer<String, Integer> mockWriter = mock(Writer.class);
Configuration conf = new Configuration();
conf.set(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, "2");
coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf);
coc.setWriter(mockWriter);
verify(mockTaskReporter, never()).progress();
coc.collect("dummy", 1);
verify(mockTaskReporter, never()).progress();
coc.collect("dummy", 2);
verify(mockTaskReporter, times(1)).progress();
}
@Test
public void testDefaultCollect() throws Throwable {
//mock creation
TaskReporter mockTaskReporter = mock(TaskReporter.class);
@SuppressWarnings("unchecked")
Writer<String, Integer> mockWriter = mock(Writer.class);
Configuration conf = new Configuration();
coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf);
coc.setWriter(mockWriter);
verify(mockTaskReporter, never()).progress();
for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) {
coc.collect("dummy", i);
}
verify(mockTaskReporter, times(1)).progress();
for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) {
coc.collect("dummy", i);
}
verify(mockTaskReporter, times(2)).progress();
}
@Test
public void testCustomCollect() throws Throwable {
//mock creation
TaskReporter mockTaskReporter = mock(TaskReporter.class);
@SuppressWarnings("unchecked")
Writer<String, Integer> mockWriter = mock(Writer.class);
Configuration conf = new Configuration();
conf.set(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, "2");
coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf);
coc.setWriter(mockWriter);
verify(mockTaskReporter, never()).progress();
coc.collect("dummy", 1);
verify(mockTaskReporter, never()).progress();
coc.collect("dummy", 2);
verify(mockTaskReporter, times(1)).progress();
}
@Test
public void testDefaultCollect() throws Throwable {
//mock creation
TaskReporter mockTaskReporter = mock(TaskReporter.class);
@SuppressWarnings("unchecked")
Writer<String, Integer> mockWriter = mock(Writer.class);
Configuration conf = new Configuration();
coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf);
coc.setWriter(mockWriter);
verify(mockTaskReporter, never()).progress();
for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) {
coc.collect("dummy", i);
}
verify(mockTaskReporter, times(1)).progress();
for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) {
coc.collect("dummy", i);
}
verify(mockTaskReporter, times(2)).progress();
}
@SuppressWarnings("unchecked")
@Test
public void testProgressIsReportedIfInputASeriesOfEmptyFiles() throws IOException, InterruptedException {
JobConf conf = new JobConf();
Path[] paths = new Path[3];
File[] files = new File[3];
long[] fileLength = new long[3];
try {
for(int i=0;i<3;i++){
File dir = new File(outDir.toString());
dir.mkdir();
files[i] = new File(dir,"testfile"+i);
FileWriter fileWriter = new FileWriter(files[i]);
fileWriter.flush();
fileWriter.close();
fileLength[i] = i;
paths[i] = new Path(outDir+"/testfile"+i);
}
CombineFileSplit combineFileSplit = new CombineFileSplit(paths, fileLength);
TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
TaskReporter reporter = Mockito.mock(TaskReporter.class);
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(conf, taskAttemptID,reporter);
CombineFileRecordReader cfrr = new CombineFileRecordReader(combineFileSplit,
taskAttemptContext, TextRecordReaderWrapper.class);
cfrr.initialize(combineFileSplit,taskAttemptContext);
verify(reporter).progress();
Assert.assertFalse(cfrr.nextKeyValue());
verify(reporter, times(3)).progress();
} finally {
FileUtil.fullyDelete(new File(outDir.toString()));
}
}
@SuppressWarnings("unchecked")
@Test
public void testProgressIsReportedIfInputASeriesOfEmptyFiles() throws IOException, InterruptedException {
JobConf conf = new JobConf();
Path[] paths = new Path[3];
File[] files = new File[3];
long[] fileLength = new long[3];
try {
for(int i=0;i<3;i++){
File dir = new File(outDir.toString());
dir.mkdir();
files[i] = new File(dir,"testfile"+i);
FileWriter fileWriter = new FileWriter(files[i]);
fileWriter.flush();
fileWriter.close();
fileLength[i] = i;
paths[i] = new Path(outDir+"/testfile"+i);
}
CombineFileSplit combineFileSplit = new CombineFileSplit(paths, fileLength);
TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
TaskReporter reporter = Mockito.mock(TaskReporter.class);
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(conf, taskAttemptID,reporter);
CombineFileRecordReader cfrr = new CombineFileRecordReader(combineFileSplit,
taskAttemptContext, TextRecordReaderWrapper.class);
cfrr.initialize(combineFileSplit,taskAttemptContext);
verify(reporter).progress();
Assert.assertFalse(cfrr.nextKeyValue());
verify(reporter, times(3)).progress();
} finally {
FileUtil.fullyDelete(new File(outDir.toString()));
}
}
public MapSpillSortCounters(TaskReporter taskReporter) {
this.reporter = taskReporter;
numSpillsVal = 0;
mapSpillCPUVal = 0;
mapSpillWallClockVal = 0;
mapSpillBytesVal = 0;
mapMemSortCPUVal = 0;
mapMemSortWallClockVal = 0;
mapMergeCPUVal = 0;
mapMergeWallClockVal = 0;
mapSpillSingleRecordNum = 0;
}
public BasicReducePartition(int reduceNum,
MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
BlockMapOutputCollector<K, V> collector, TaskReporter reporter)
throws IOException {
this.partition = reduceNum;
this.collectedBytesSize = 0;
this.collectedRecordsNum = 0;
this.memoryBlockAllocator = memoryBlockAllocator;
this.kvbuffer = kvBuffer;
this.collector = collector;
this.reporter = reporter;
this.memoryBlockAllocator.registerMemoryBlockHolder(this);
initMemoryBlocks();
}
public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
this.mapTask = mapTask;
this.jobConf = jobConf;
this.reporter = reporter;
}
public TaskReporter getReporter() {
return reporter;
}
public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
this.mapTask = mapTask;
this.jobConf = jobConf;
this.reporter = reporter;
}
public TaskReporter getReporter() {
return reporter;
}
@SuppressWarnings( { "unchecked", "deprecation" })
public BlockMapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter, MapTask task) throws IOException,
ClassNotFoundException {
this.task = task;
this.job = job;
this.reporter = reporter;
localFs = FileSystem.getLocal(job);
partitions = job.getNumReduceTasks();
indexCacheList = new ArrayList<SpillRecord>();
if (partitions > 0) {
partitioner = (Partitioner<K, V>) ReflectionUtils.newInstance(job
.getPartitionerClass(), job);
} else {
partitioner = new Partitioner() {
@Override
public int getPartition(Object key, Object value, int numPartitions) {
return -1;
}
@Override
public void configure(JobConf job) {
}
};
}
rfs = ((LocalFileSystem) localFs).getRaw();
float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
if (spillper > (float) 1.0 || spillper < (float) 0.0) {
LOG.error("Invalid \"io.sort.spill.percent\": " + spillper);
spillper = 0.8f;
}
lastSpillInMem = job.getBoolean("mapred.map.lastspill.memory", true);
numBigRecordsWarnThreshold =
job.getInt("mapred.map.bigrecord.spill.warn.threshold", 500);
int sortmb = job.getInt("io.sort.mb", 100);
boolean localMode = job.get("mapred.job.tracker", "local").equals("local");
if (localMode) {
sortmb = job.getInt("io.sort.mb.localmode", 100);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
}
LOG.info("io.sort.mb = " + sortmb);
// buffers and accounting
kvBufferSize = sortmb << 20;
kvbuffer = new byte[kvBufferSize];
softBufferLimit = (int) (kvbuffer.length * spillper);
// k/v serialization
keyClass = (Class<K>) job.getMapOutputKeyClass();
valClass = (Class<V>) job.getMapOutputValueClass();
if (!BytesWritable.class.isAssignableFrom(keyClass)
|| !BytesWritable.class.isAssignableFrom(valClass)) {
throw new IOException(this.getClass().getName()
+ " only support " + BytesWritable.class.getName()
+ " as key and value classes, MapOutputKeyClass is "
+ keyClass.getName() + ", MapOutputValueClass is "
+ valClass.getName());
}
int numMappers = job.getNumMapTasks();
memoryBlockAllocator =
new MemoryBlockAllocator(kvBufferSize, softBufferLimit, numMappers,
partitions, this);
// counters
mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
mapSpillSortCounter = new MapSpillSortCounters(reporter);
reducePartitions = new ReducePartition[partitions];
inMemorySegments = new Segment[partitions];
for (int i = 0; i < partitions; i++) {
reducePartitions[i] = new ReducePartition(i, this.memoryBlockAllocator,
this.kvbuffer, this, this.reporter);
}
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass = job
.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
}
public ReducePartition(int reduceNum,
MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
BlockMapOutputCollector<K, V> collector, TaskReporter reporter)
throws IOException {
super(reduceNum, memoryBlockAllocator, kvBuffer, collector, reporter);
}
public TestReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
TaskReporter reporter
)throws ClassNotFoundException, IOException {
super(umbilical, conf, reporter);
}
@SuppressWarnings("deprecation")
@Test
public void testcheckAndInformJobTracker() throws Exception {
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
TaskReporter mockTaskReporter = mock(TaskReporter.class);
JobConf conf = new JobConf();
conf.setUser("testuser");
conf.setJobName("testJob");
conf.setSessionId("testSession");
TaskAttemptID tid = new TaskAttemptID();
TestReduceTask rTask = new TestReduceTask();
rTask.setConf(conf);
ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
reduceCopier.checkAndInformJobTracker(1, tid, false);
verify(mockTaskReporter, never()).progress();
reduceCopier.checkAndInformJobTracker(10, tid, false);
verify(mockTaskReporter, times(1)).progress();
// Test the config setting
conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);
rTask.setConf(conf);
reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
reduceCopier.checkAndInformJobTracker(1, tid, false);
verify(mockTaskReporter, times(1)).progress();
reduceCopier.checkAndInformJobTracker(3, tid, false);
verify(mockTaskReporter, times(2)).progress();
reduceCopier.checkAndInformJobTracker(5, tid, false);
verify(mockTaskReporter, times(2)).progress();
reduceCopier.checkAndInformJobTracker(6, tid, false);
verify(mockTaskReporter, times(3)).progress();
// test readError and its config
reduceCopier.checkAndInformJobTracker(7, tid, true);
verify(mockTaskReporter, times(4)).progress();
conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);
rTask.setConf(conf);
reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
reduceCopier.checkAndInformJobTracker(7, tid, true);
verify(mockTaskReporter, times(4)).progress();
}