下面列出了怎么用org.apache.hadoop.mapreduce.TaskCounter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Verify that all segments are read from disk
* @throws Exception might be thrown
*/
public void testReduceFromDisk() throws Exception {
final int MAP_TASKS = 8;
JobConf job = mrCluster.createJobConf();
job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "0.0");
job.setNumMapTasks(MAP_TASKS);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
job.setInt(JobContext.IO_SORT_FACTOR, 2);
job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
assertTrue("Expected all records spilled during reduce (" + spill + ")",
spill >= 2 * out); // all records spill at map, reduce
assertTrue("Expected intermediate merges (" + spill + ")",
spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
}
private void validateFileCounters(Counters counter, long fileBytesRead,
long fileBytesWritten, long mapOutputBytes,
long mapOutputMaterializedBytes) {
assertTrue(counter.findCounter(FileInputFormatCounter.BYTES_READ)
.getValue() != 0);
assertEquals(fileBytesRead,
counter.findCounter(FileInputFormatCounter.BYTES_READ).getValue());
assertTrue(counter.findCounter(FileOutputFormatCounter.BYTES_WRITTEN)
.getValue() != 0);
if (mapOutputBytes >= 0) {
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0);
}
if (mapOutputMaterializedBytes >= 0) {
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)
.getValue() != 0);
}
}
@SuppressWarnings("deprecation")
private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
int taskId, TaskType type)
throws Exception {
TaskReport[] reports = null;
if (TaskType.MAP.equals(type)) {
reports = client.getMapTaskReports(id);
} else if (TaskType.REDUCE.equals(type)) {
reports = client.getReduceTaskReports(id);
}
assertNotNull("No reports found for task type '" + type.name()
+ "' in job " + id, reports);
// make sure that the total number of reports match the expected
assertEquals("Mismatch in task id", numReports, reports.length);
Counters counters = reports[taskId].getCounters();
return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
}
/** Verify that at least one segment does not hit disk */
public void testReduceFromPartialMem() throws Exception {
final int MAP_TASKS = 7;
JobConf job = mrCluster.createJobConf();
job.setNumMapTasks(MAP_TASKS);
job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 0);
job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0");
job.setInt(JobContext.SHUFFLE_PARALLEL_COPIES, 1);
job.setInt(JobContext.IO_SORT_MB, 10);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0");
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
assertTrue("Expected some records not spilled during reduce" + spill + ")",
spill < 2 * out); // spilled map records, some records at the reduce
}
public Task(String jobFile, TaskAttemptID taskId, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
this.taskId = taskId;
this.partition = partition;
this.numSlotsRequired = numSlotsRequired;
this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
0.0f, numSlotsRequired,
TaskStatus.State.UNASSIGNED,
"", "", "",
isMapTask() ?
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
gcUpdater = new GcTimeUpdater();
}
TrackedRecordReader(TaskReporter reporter, JobConf job)
throws IOException{
inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
this.reporter = reporter;
List<Statistics> matchedStats = null;
if (this.reporter.getInputSplit() instanceof FileSplit) {
matchedStats = getFsStatistics(((FileSplit) this.reporter
.getInputSplit()).getPath(), job);
}
fsStats = matchedStats;
bytesInPrev = getInputBytes(fsStats);
rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(),
job, reporter);
bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
@SuppressWarnings("unchecked")
NewDirectOutputCollector(MRJobConfig jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
mapOutputRecordCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
this.reporter = context.getReporter();
JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
OutputFormat<K, V> outputFormat = job.getOutputFormat();
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
this.skipGroupCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
this.skipRecCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
@Test
public void testCounters() throws IOException {
Enum[] keysWithResource = {TaskCounter.MAP_INPUT_RECORDS,
TaskCounter.MAP_OUTPUT_BYTES};
Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};
String[] groups = {"group1", "group2", "group{}()[]"};
String[] counters = {"counter1", "counter2", "counter{}()[]"};
try {
// I. Check enum counters that have resource bundler
testCounter(getEnumCounters(keysWithResource));
// II. Check enum counters that dont have resource bundler
testCounter(getEnumCounters(keysWithoutResource));
// III. Check string counters
testCounter(getEnumCounters(groups, counters));
} catch (ParseException pe) {
throw new IOException(pe);
}
}
@SuppressWarnings("deprecation")
private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
assertEquals("New name and method", 1, counters.findCounter("file",
FileSystemCounter.BYTES_READ).getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"FileSystemCounters",
"FILE_BYTES_READ").getValue());
}
/**
* Verify that all segments are read from disk
* @throws Exception might be thrown
*/
public void testReduceFromDisk() throws Exception {
final int MAP_TASKS = 8;
JobConf job = mrCluster.createJobConf();
job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "0.0");
job.setNumMapTasks(MAP_TASKS);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
job.setInt(JobContext.IO_SORT_FACTOR, 2);
job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
assertTrue("Expected all records spilled during reduce (" + spill + ")",
spill >= 2 * out); // all records spill at map, reduce
assertTrue("Expected intermediate merges (" + spill + ")",
spill >= 2 * out + (out / MAP_TASKS)); // some records hit twice
}
private void validateFileCounters(Counters counter, long fileBytesRead,
long fileBytesWritten, long mapOutputBytes,
long mapOutputMaterializedBytes) {
assertTrue(counter.findCounter(FileInputFormatCounter.BYTES_READ)
.getValue() != 0);
assertEquals(fileBytesRead,
counter.findCounter(FileInputFormatCounter.BYTES_READ).getValue());
assertTrue(counter.findCounter(FileOutputFormatCounter.BYTES_WRITTEN)
.getValue() != 0);
if (mapOutputBytes >= 0) {
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0);
}
if (mapOutputMaterializedBytes >= 0) {
assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)
.getValue() != 0);
}
}
@SuppressWarnings("deprecation")
private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
int taskId, TaskType type)
throws Exception {
TaskReport[] reports = null;
if (TaskType.MAP.equals(type)) {
reports = client.getMapTaskReports(id);
} else if (TaskType.REDUCE.equals(type)) {
reports = client.getReduceTaskReports(id);
}
assertNotNull("No reports found for task type '" + type.name()
+ "' in job " + id, reports);
// make sure that the total number of reports match the expected
assertEquals("Mismatch in task id", numReports, reports.length);
Counters counters = reports[taskId].getCounters();
return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
}
/** Verify that at least one segment does not hit disk */
public void testReduceFromPartialMem() throws Exception {
final int MAP_TASKS = 7;
JobConf job = mrCluster.createJobConf();
job.setNumMapTasks(MAP_TASKS);
job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 0);
job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0");
job.setInt(JobContext.SHUFFLE_PARALLEL_COPIES, 1);
job.setInt(JobContext.IO_SORT_MB, 10);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0");
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
assertTrue("Expected some records not spilled during reduce" + spill + ")",
spill < 2 * out); // spilled map records, some records at the reduce
}
public Task(String jobFile, TaskAttemptID taskId, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
this.taskId = taskId;
this.partition = partition;
this.numSlotsRequired = numSlotsRequired;
this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
0.0f, numSlotsRequired,
TaskStatus.State.UNASSIGNED,
"", "", "",
isMapTask() ?
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
gcUpdater = new GcTimeUpdater();
}
TrackedRecordReader(TaskReporter reporter, JobConf job)
throws IOException{
inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
this.reporter = reporter;
List<Statistics> matchedStats = null;
if (this.reporter.getInputSplit() instanceof FileSplit) {
matchedStats = getFsStatistics(((FileSplit) this.reporter
.getInputSplit()).getPath(), job);
}
fsStats = matchedStats;
bytesInPrev = getInputBytes(fsStats);
rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(),
job, reporter);
bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
@SuppressWarnings("unchecked")
NewDirectOutputCollector(MRJobConfig jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
mapOutputRecordCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
this.reporter = context.getReporter();
JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
OutputFormat<K, V> outputFormat = job.getOutputFormat();
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
this.skipGroupCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
this.skipRecCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
@Test
public void testCounters() throws IOException {
Enum[] keysWithResource = {TaskCounter.MAP_INPUT_RECORDS,
TaskCounter.MAP_OUTPUT_BYTES};
Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};
String[] groups = {"group1", "group2", "group{}()[]"};
String[] counters = {"counter1", "counter2", "counter{}()[]"};
try {
// I. Check enum counters that have resource bundler
testCounter(getEnumCounters(keysWithResource));
// II. Check enum counters that dont have resource bundler
testCounter(getEnumCounters(keysWithoutResource));
// III. Check string counters
testCounter(getEnumCounters(groups, counters));
} catch (ParseException pe) {
throw new IOException(pe);
}
}
@SuppressWarnings("deprecation")
private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
assertEquals("New name and method", 1, counters.findCounter("file",
FileSystemCounter.BYTES_READ).getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"FileSystemCounters",
"FILE_BYTES_READ").getValue());
}
/**
* Run MR job to check the number of mapper = expectedNumOfSplits
*/
protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
throws IOException, InterruptedException, ClassNotFoundException {
String jobName = "TestJobForNumOfSplits-MR";
LOG.info("Before map/reduce startup - job " + jobName);
JobConf c = new JobConf(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILYS[0]);
scan.addFamily(INPUT_FAMILYS[1]);
c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
c.set(KEY_STARTROW, "");
c.set(KEY_LASTROW, "");
Job job = Job.getInstance(c, jobName);
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
job.setReducerClass(ScanReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
assertTrue("job failed!", job.waitForCompletion(true));
// for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
// we use TaskCounter.SHUFFLED_MAPS to get total launched maps
assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
}
public void updateJobCounter() {
try {
Counters counters = job.getCounters();
if (counters == null) {
String errorMsg = "no counters for job " + getMrJobId();
log.warn(errorMsg);
output.append(errorMsg);
return;
}
this.output.append(counters.toString()).append("\n");
log.debug(counters.toString());
mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
output.append(e.getLocalizedMessage());
}
}
@Test
public void testRunOldCombiner() throws IOException, InterruptedException {
TezConfiguration conf = new TezConfiguration();
setKeyAndValueClassTypes(conf);
conf.setClass("mapred.combiner.class", OldReducer.class, Object.class);
TaskContext taskContext = getTaskContext(conf);
MRCombiner combiner = new MRCombiner(taskContext);
Writer writer = Mockito.mock(Writer.class);
combiner.combine(new TezRawKeyValueIteratorTest(), writer);
long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
assertEquals(6, inputRecords);
assertEquals(3, outputRecords);
// verify combiner output keys and values
verifyKeyAndValues(writer);
}
@Test
public void testRunNewCombiner() throws IOException, InterruptedException {
TezConfiguration conf = new TezConfiguration();
setKeyAndValueClassTypes(conf);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, NewReducer.class,
Object.class);
TaskContext taskContext = getTaskContext(conf);
MRCombiner combiner = new MRCombiner(taskContext);
Writer writer = Mockito.mock(Writer.class);
combiner.combine(new TezRawKeyValueIteratorTest(), writer);
long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
assertEquals(6, inputRecords);
assertEquals(3, outputRecords);
// verify combiner output keys and values
verifyKeyAndValues(writer);
}
@Test
public void testTop2RunNewCombiner() throws IOException, InterruptedException {
TezConfiguration conf = new TezConfiguration();
setKeyAndValueClassTypes(conf);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, Top2NewReducer.class,
Object.class);
TaskContext taskContext = getTaskContext(conf);
MRCombiner combiner = new MRCombiner(taskContext);
Writer writer = Mockito.mock(Writer.class);
combiner.combine(new TezRawKeyValueIteratorTest(), writer);
long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
assertEquals(6, inputRecords);
assertEquals(5, outputRecords);
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}