下面列出了org.apache.hadoop.mapreduce.Counters#getGroup ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public String toString() {
StringBuilder builder = new StringBuilder();
while (source.hasNext()) {
Entry<String,Counters> nextCntr = source.next();
builder.append("\n").append(nextCntr.getKey()).append("\n----------------------\n");
Counters counters = nextCntr.getValue();
for (String groupName : counters.getGroupNames()) {
CounterGroup group = counters.getGroup(groupName);
Iterator<Counter> cntrItr = group.iterator();
while (cntrItr.hasNext()) {
Counter counter = cntrItr.next();
builder.append(groupName).append("\t").append(counter.getDisplayName()).append("=").append(counter.getValue()).append("\n");
}
}
}
return builder.toString();
}
private static String checkForIngestLabelOverride(Counters ingestJobCounters) {
CounterGroup jobQueueName = ingestJobCounters.getGroup(IngestProcess.METRICS_LABEL_OVERRIDE.name());
if (jobQueueName.size() > 0) {
Counter myCounter = jobQueueName.iterator().next();
return myCounter.getName();
}
return null;
}
@Override
public void map(Text flagFile, Counters counters, Context context) throws IOException, InterruptedException {
System.out.println("Received counters for job " + flagFile);
log.info(counters);
long endTime = counters.findCounter(InputFile.FLAGMAKER_END_TIME).getValue();
long startTime = counters.findCounter(InputFile.FLAGMAKER_START_TIME).getValue();
Mutation statsPersist = new Mutation("flagFile\u0000" + flagFile);
statsPersist.put("", "", new Value(serializeCounters(counters)));
context.write(null, statsPersist);
// Breaking it down into individual counters... Can't get individual stats when batch-processing in the FlagMaker
for (Counter c : counters.getGroup(InputFile.class.getSimpleName())) {
Text outFile = new Text(c.getName());
Mutation m = new Mutation(outFile);
long fileTime = c.getValue();
try {
Counters cs = new Counters();
cs.findCounter(InputFile.class.getSimpleName(), outFile.toString()).setValue(c.getValue());
cs.findCounter(FlagFile.class.getSimpleName(), flagFile.toString()).increment(1);
cs.findCounter(InputFile.FLAGMAKER_END_TIME).setValue(endTime);
cs.findCounter(InputFile.FLAGMAKER_START_TIME).setValue(startTime);
m.put(WritableUtil.getLong(endTime - fileTime), WritableUtil.getLong(endTime), new Value(serializeCounters(cs)));
context.write(null, m);
} catch (IOException e) {
log.error("Could not add counters to mutation!!!", e);
}
}
}
/**
* Adds detail for a Map phase.
* @param task2
*
* @param task2 the tasks
* @param referencedZeroTime
* @param referencedZeroTime the start time
* @return the phase details
*/
private TaskOutputDetails addMapPhaseDetails(Entry<TaskAttemptID, TaskAttemptInfo> task, long referencedZeroTime) {
TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue());
TaskOutputDetails taskOutputDetails = new TaskOutputDetails();
if(taskAttemptInfo.getTaskStatus().equalsIgnoreCase("SUCCEEDED")){
taskOutputDetails.setTaskStatus(taskAttemptInfo.getTaskStatus());
taskOutputDetails.setTaskType(taskAttemptInfo.getTaskType().toString());
taskOutputDetails.setTaskID(taskAttemptInfo.getAttemptId().getTaskID().toString());
long startPoint = (taskAttemptInfo.getStartTime() - referencedZeroTime) / CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setStartPoint(startPoint);
long endPoint = (taskAttemptInfo.getFinishTime() - referencedZeroTime) / CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setEndPoint(endPoint);
taskOutputDetails.setTimeTaken(endPoint - startPoint);
taskOutputDetails.setLocation(taskAttemptInfo.getHostname());
Counters counters = taskAttemptInfo.getCounters();
CounterGroup fileSystemCounters = counters.getGroup("org.apache.hadoop.mapreduce.FileSystemCounter");
Counter inputBytes = fileSystemCounters.findCounter("HDFS_BYTES_READ");
long dataFlowRate = inputBytes.getValue() / (endPoint - startPoint);
taskOutputDetails.setDataFlowRate(dataFlowRate);
CounterGroup mapReduceTaskCounters = counters.getGroup("org.apache.hadoop.mapreduce.TaskCounter");
Counter mapOutputRecords = mapReduceTaskCounters.findCounter("MAP_OUTPUT_RECORDS");
Counter physicalMemoryBytes = mapReduceTaskCounters.findCounter("PHYSICAL_MEMORY_BYTES");
ResourceUsageMetrics rum = new ResourceUsageMetrics();
rum.setPhysicalMemoryUsage(physicalMemoryBytes.getValue());
taskOutputDetails.setResourceUsageMetrics(rum);
taskOutputDetails.setOutputRecords(mapOutputRecords.getValue());
Counter mapOutputBytes = mapReduceTaskCounters.findCounter("MAP_OUTPUT_BYTES");
taskOutputDetails.setOutputBytes(mapOutputBytes.getValue());}
return taskOutputDetails;
}
private void printCounters(StringBuffer buff, Counters totalCounters,
Counters mapCounters, Counters reduceCounters) {
// Killed jobs might not have counters
if (totalCounters == null) {
return;
}
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------"+
"---------------------------------------------");
for (String groupName : totalCounters.getGroupNames()) {
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<org.apache.hadoop.mapreduce.Counter> ctrItr =
totalGroup.iterator();
while(ctrItr.hasNext()) {
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
String mapValue =
decimal.format(mapGroup.findCounter(name).getValue());
String reduceValue =
decimal.format(reduceGroup.findCounter(name).getValue());
String totalValue =
decimal.format(counter.getValue());
buff.append(
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}
private void printCounters(StringBuffer buff, Counters totalCounters,
Counters mapCounters, Counters reduceCounters) {
// Killed jobs might not have counters
if (totalCounters == null) {
return;
}
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------"+
"---------------------------------------------");
for (String groupName : totalCounters.getGroupNames()) {
CounterGroup totalGroup = totalCounters.getGroup(groupName);
CounterGroup mapGroup = mapCounters.getGroup(groupName);
CounterGroup reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<org.apache.hadoop.mapreduce.Counter> ctrItr =
totalGroup.iterator();
while(ctrItr.hasNext()) {
org.apache.hadoop.mapreduce.Counter counter = ctrItr.next();
String name = counter.getName();
String mapValue =
decimal.format(mapGroup.findCounter(name).getValue());
String reduceValue =
decimal.format(reduceGroup.findCounter(name).getValue());
String totalValue =
decimal.format(counter.getValue());
buff.append(
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}
/**
* This method is responsible for populating the reduce phase details.
* @return TaskOutputDetails contains the details of the reduce phase.
*/
private TaskOutputDetails addReducePhaseDetails(
Entry<TaskAttemptID, TaskAttemptInfo> task, long referencedZeroTime) {
TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue());
TaskOutputDetails taskOutputDetails = new TaskOutputDetails();
if(taskAttemptInfo.getTaskStatus().equalsIgnoreCase("SUCCEEDED")){
taskOutputDetails.setTaskStatus(taskAttemptInfo.getTaskStatus());
taskOutputDetails.setTaskType(taskAttemptInfo.getTaskType().toString());
taskOutputDetails.setTaskID(taskAttemptInfo.getAttemptId().getTaskID().toString());
taskOutputDetails.setLocation(taskAttemptInfo.getHostname());
Counters counters = taskAttemptInfo.getCounters();
CounterGroup mapReduceTaskCounters = counters.getGroup("org.apache.hadoop.mapreduce.TaskCounter");
Counter reduceOutputRecords = mapReduceTaskCounters.findCounter("REDUCE_OUTPUT_RECORDS");
taskOutputDetails.setOutputRecords(reduceOutputRecords.getValue());
Counter reduceOutputBytes = mapReduceTaskCounters.findCounter("SPILLED_RECORDS");
taskOutputDetails.setOutputBytes(reduceOutputBytes.getValue());
long shuffleStartTime = (taskAttemptInfo.getStartTime()- referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setStartPoint(shuffleStartTime);
taskOutputDetails.setShuffleStart(shuffleStartTime);
LOGGER.debug("shuffle start time" + taskOutputDetails.getShuffleStart());
long shuffleEnd = ((taskAttemptInfo.getShuffleFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
taskOutputDetails.setShuffleEnd(shuffleEnd);
LOGGER.debug("shuffle end time" + taskOutputDetails.getShuffleEnd());
taskOutputDetails.setSortStart(shuffleEnd);
long sortEnd = (taskAttemptInfo.getSortFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setSortEnd(sortEnd);
LOGGER.debug("sort end time" + taskOutputDetails.getSortEnd());
taskOutputDetails.setReduceStart(sortEnd);
taskOutputDetails.setReduceEnd((taskAttemptInfo.getFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
taskOutputDetails.setEndPoint(taskOutputDetails.getReduceEnd());
LOGGER.debug("Reduce end time" + taskOutputDetails.getReduceEnd());
long dataFlowRate = reduceOutputBytes.getValue() / (taskOutputDetails.getReduceEnd()-shuffleStartTime);
taskOutputDetails.setDataFlowRate(dataFlowRate);
Counter physicalMemoryBytes = mapReduceTaskCounters.findCounter("PHYSICAL_MEMORY_BYTES");
ResourceUsageMetrics rum = new ResourceUsageMetrics();
rum.setPhysicalMemoryUsage(physicalMemoryBytes.getValue());
taskOutputDetails.setResourceUsageMetrics(rum);
}
return taskOutputDetails;
}