下面列出了怎么用org.apache.hadoop.mapred.Counters.Group的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testFileSystemGroupIteratorConcurrency() {
Counters counters = new Counters();
// create 2 filesystem counter groups
counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1);
counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1);
// Iterate over the counters in this group while updating counters in
// the group
Group group = counters.getGroup(FileSystemCounter.class.getName());
Iterator<Counter> iterator = group.iterator();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
}
@SuppressWarnings("rawtypes")
@Test
public void testFrameworkCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(JobCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
@Test
public void testFileSystemGroupIteratorConcurrency() {
Counters counters = new Counters();
// create 2 filesystem counter groups
counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1);
counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1);
// Iterate over the counters in this group while updating counters in
// the group
Group group = counters.getGroup(FileSystemCounter.class.getName());
Iterator<Counter> iterator = group.iterator();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
}
@SuppressWarnings("rawtypes")
@Test
public void testFrameworkCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(JobCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
public Counters.Counter getCounter(String group, String name) {
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
if (counter == null) {
Group grp = counters.addGroup(group, group);
counter = grp.addCounter(name, name, 10);
}
}
return counter;
}
@SuppressWarnings("deprecation")
@Test
public void testCounterIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Iterator<Group> iterator = counters.iterator();
counters.incrCounter("group2", "counter2", 1);
iterator.next();
}
@SuppressWarnings("deprecation")
@Test
public void testGroupIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Group group = counters.getGroup("group1");
Iterator<Counter> iterator = group.iterator();
counters.incrCounter("group1", "counter2", 1);
iterator.next();
}
@Test
public void testFilesystemCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
Group fsGroup = groupFactory.newFileSystemGroup();
org.apache.hadoop.mapreduce.Counter count1 =
fsGroup.findCounter("ANY_BYTES_READ");
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
fsGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
private void validateCounters() throws IOException {
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
}
public Counters.Counter getCounter(String group, String name) {
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
if (counter == null) {
Group grp = counters.addGroup(group, group);
counter = grp.addCounter(name, name, 10);
}
}
return counter;
}
@SuppressWarnings("deprecation")
@Test
public void testCounterIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Iterator<Group> iterator = counters.iterator();
counters.incrCounter("group2", "counter2", 1);
iterator.next();
}
@SuppressWarnings("deprecation")
@Test
public void testGroupIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Group group = counters.getGroup("group1");
Iterator<Counter> iterator = group.iterator();
counters.incrCounter("group1", "counter2", 1);
iterator.next();
}
@Test
public void testFilesystemCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
Group fsGroup = groupFactory.newFileSystemGroup();
org.apache.hadoop.mapreduce.Counter count1 =
fsGroup.findCounter("ANY_BYTES_READ");
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
fsGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
private void validateCounters() throws IOException {
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
}
private Counters covertToHadoopCounters(TezCounters tezCounters) {
Counters counters = new Counters();
for (CounterGroup tezGrp : tezCounters) {
Group grp = counters.addGroup(tezGrp.getName(), tezGrp.getDisplayName());
for (TezCounter counter : tezGrp) {
grp.addCounter(counter.getName(), counter.getDisplayName(), counter.getValue());
}
}
return counters;
}
@SuppressWarnings("deprecation")
private static void parseAndAddJobCounters(Map<String, String> job, String counters) {
try {
Counters counterGroups = Counters.fromEscapedCompactString(counters);
for (Group otherGroup : counterGroups) {
Group group = counterGroups.getGroup(otherGroup.getName());
for (Counter otherCounter : otherGroup) {
Counter counter = group.getCounterForName(otherCounter.getName());
job.put(otherCounter.getName(), String.valueOf(counter.getValue()));
}
}
} catch (ParseException e) {
LOG.warn("Failed to parse job counters", e);
}
}
/**
* @param jobClient
* @param jobReport
* @param jobId
* @param job
* @throws IOException
*/
private Map<String, Object> getDetailedJobReport(org.apache.hadoop.mapred.JobID jobId) throws IOException {
Map<String, Object> jobDetailedReport = new HashMap<String, Object>();
RunningJob job = jobClient.getJob(jobId);
Counters counters = job.getCounters();
List counterList = new ArrayList();
for (Group group : counters) {
Map<String, Object> counterMap = new HashMap<String, Object>();
counterMap.put("name", group.getDisplayName());
List subCounters = new ArrayList();
for (Counter counter : group) {
Map subCounter = new HashMap();
subCounter.put("name", counter.getDisplayName());
subCounter.put("value", counter.getCounter());
subCounters.add(subCounter);
}
counterMap.put("subCounters", subCounters);
counterList.add(counterMap);
}
jobDetailedReport.put("counters", counterList);
jobDetailedReport.put("mapReport",
getTaskReport(jobClient.getMapTaskReports(jobId)));
jobDetailedReport.put("reduceReport",
getTaskReport(jobClient.getReduceTaskReports(jobId)));
jobDetailedReport.put("cleanupReport",
getTaskReport(jobClient.getCleanupTaskReports(jobId)));
jobDetailedReport.put("setupReport",
getTaskReport(jobClient.getSetupTaskReports(jobId)));
return jobDetailedReport;
}
private void clearCounters() {
for (Group g : countersToMetrics) {
for (Counter c : g) {
c.setValue(0);
}
}
}
public void testCommandLine() throws IOException
{
try {
try {
OUTPUT_DIR.getAbsoluteFile().delete();
} catch (Exception e) {
}
createInput();
boolean mayExit = false;
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
StreamJob job = new StreamJob(genArgs(), mayExit);
job.go();
File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
outFile.delete();
assertEquals(outputExpect, output);
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
} finally {
File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
INPUT_FILE.delete();
outFileCRC.delete();
OUTPUT_DIR.getAbsoluteFile().delete();
}
}
public void testCommandLine() throws IOException
{
try {
try {
OUTPUT_DIR.getAbsoluteFile().delete();
} catch (Exception e) {
}
createInput();
boolean mayExit = false;
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
StreamJob job = new StreamJob(genArgs(), mayExit);
job.go();
File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
outFile.delete();
assertEquals(outputExpect, output);
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
} finally {
File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
INPUT_FILE.delete();
outFileCRC.delete();
OUTPUT_DIR.getAbsoluteFile().delete();
}
}
public <T extends Enum<T>>
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
return super.newFrameworkGroupFactory(cls);
}
public Group newFileSystemGroup() {
return super.newFileSystemGroup();
}
public <T extends Enum<T>>
FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
return super.newFrameworkGroupFactory(cls);
}
public Group newFileSystemGroup() {
return super.newFileSystemGroup();
}
/**
* Reads the global counters produced by a job on the group labeled with PIG_MAP_RANK_NAME.
* Then, it is calculated the cumulative sum, which consists on the sum of previous cumulative
* sum plus the previous global counter value.
* @param job with the global counters collected.
* @param operationID After being collected on global counters (POCounter),
* these values are passed via configuration file to PORank, by using the unique
* operation identifier
*/
private void saveCounters(Job job, String operationID) {
Counters counters;
Group groupCounters;
Long previousValue = 0L;
Long previousSum = 0L;
ArrayList<Pair<String,Long>> counterPairs;
try {
counters = HadoopShims.getCounters(job);
String groupName = getGroupName(counters.getGroupNames());
// In case that the counter group was not find, we need to find
// out why. Only acceptable state is that the relation has been
// empty.
if (groupName == null) {
Counter outputRecords =
counters.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP)
.getCounterForName(MRPigStatsUtil.MAP_OUTPUT_RECORDS);
if(outputRecords.getCounter() == 0) {
globalCounters.put(operationID, new ArrayList<Pair<String, Long>>());
return;
} else {
throw new RuntimeException("Did not found RANK counter group for operationId: " + operationID);
}
}
groupCounters = counters.getGroup(groupName);
Iterator<Counter> it = groupCounters.iterator();
HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
while(it.hasNext()) {
try{
Counter c = it.next();
counterList.put(Integer.valueOf(c.getDisplayName()), c.getValue());
} catch (Exception ex) {
ex.printStackTrace();
}
}
counterSize = counterList.size();
counterPairs = new ArrayList<Pair<String,Long>>();
for(int i = 0; i < counterSize; i++){
previousSum += previousValue;
previousValue = counterList.get(Integer.valueOf(i));
counterPairs.add(new Pair<String, Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
}
globalCounters.put(operationID, counterPairs);
} catch (Exception e) {
String msg = "Error to read counters into Rank operation counterSize "+counterSize;
throw new RuntimeException(msg, e);
}
}
private void printCounters(StringBuffer buff, JobInfo job)
throws ParseException {
Counters mapCounters =
Counters.fromEscapedCompactString(job.get(Keys.MAP_COUNTERS));
Counters reduceCounters =
Counters.fromEscapedCompactString(job.get(Keys.REDUCE_COUNTERS));
Counters totalCounters =
Counters.fromEscapedCompactString(job.get(Keys.COUNTERS));
// Killed jobs might not have counters
if (totalCounters == null) {
return;
}
buff.append("\nCounters: \n\n");
buff.append(String.format("|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s|",
"Group Name",
"Counter name",
"Map Value",
"Reduce Value",
"Total Value"));
buff.append("\n------------------------------------------"+
"---------------------------------------------");
for (String groupName : totalCounters.getGroupNames()) {
Group totalGroup = totalCounters.getGroup(groupName);
Group mapGroup = mapCounters.getGroup(groupName);
Group reduceGroup = reduceCounters.getGroup(groupName);
Format decimal = new DecimalFormat();
Iterator<Counter> ctrItr = totalGroup.iterator();
while (ctrItr.hasNext()) {
Counter counter = ctrItr.next();
String name = counter.getDisplayName();
String mapValue = decimal.format(mapGroup.getCounter(name));
String reduceValue = decimal.format(reduceGroup.getCounter(name));
String totalValue = decimal.format(counter.getValue());
buff.append(
String.format("\n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));
}
}
}