下面列出了org.apache.hadoop.mapred.Counters#Group ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static String extractCounter(String counterFromHist,
String... counterNames)
throws ParseException {
Counters counters =
Counters.fromEscapedCompactString(counterFromHist);
for (Counters.Group group : counters) {
for (Counters.Counter counter : group) {
for (String counterName : counterNames) {
if (counterName.equals(counter.getName())) {
return String.valueOf(counter.getCounter());
}
}
}
}
return null;
}
public static String extractCounter(String counterFromHist,
String... counterNames)
throws ParseException {
Counters counters =
Counters.fromEscapedCompactString(counterFromHist);
for (Counters.Group group : counters) {
for (Counters.Counter counter : group) {
for (String counterName : counterNames) {
if (counterName.equals(counter.getName())) {
return String.valueOf(counter.getCounter());
}
}
}
}
return "";
}
@Override
public List<HadoopCounterKeyValuePair> getAllCounters() throws IOException {
List<HadoopCounterKeyValuePair> result = new ArrayList<HadoopCounterKeyValuePair>();
for (Counters.Group group : job.getCounters()) {
for (Counter counter : group) {
result.add(new HadoopCounterKeyValuePair(counter.getName(), group.getName(), counter.getValue()));
}
}
return result;
}
/**
* Print this job counters (for debugging purpose)
*/
void printCounters() {
System.out.printf("New Job:\n", counters);
for (String groupName : counters.getGroupNames()) {
Counters.Group group = counters.getGroup(groupName);
System.out.printf("\t%s[%s]\n", groupName, group.getDisplayName());
for (Counters.Counter counter : group) {
System.out.printf("\t\t%s: %s\n", counter.getDisplayName(),
counter.getCounter());
}
}
System.out.printf("\n");
}
/**
* Print this job counters (for debugging purpose)
*/
void printCounters() {
System.out.printf("New Job:\n", counters);
for (String groupName : counters.getGroupNames()) {
Counters.Group group = counters.getGroup(groupName);
System.out.printf("\t%s[%s]\n", groupName, group.getDisplayName());
for (Counters.Counter counter : group) {
System.out.printf("\t\t%s: %s\n", counter.getDisplayName(),
counter.getCounter());
}
}
System.out.printf("\n");
}
private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.err.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.err.println("part0:<"+parts[0]+">,:part1 <"+parts[1]+">");
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
job.put(JobKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
job.put(JobKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Job Counters .Launched map tasks")) {
job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
} else if (parts[0].equals("Job Counters .Data-local map tasks")) {
job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
job.put(JobKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
}
}
}
}
private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
} else {
System.err.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
}
}
}
}
private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
reduceTask.setValue(ReduceTaskKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
}
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
String inpath = null;
String outpath = null;
int kmerlen = 0;
int numMappers = 1;
int numReducers = 1;
int showpos = 0;
int data = 1;
if (data == 0)
{
if (args.length != 6)
{
System.err.println("Usage: CountKmers filename outpath kmerlen showpos numMappers numReducers");
return;
}
inpath = args[0];
outpath = args[1];
kmerlen = Integer.parseInt(args[2]);
showpos = Integer.parseInt(args[3]);
numMappers = Integer.parseInt(args[4]);
numReducers = Integer.parseInt(args[5]);
}
else if (data == 1)
{
inpath = "/user/guest/cloudburst/s_suis.br";
outpath = "/user/mschatz/kmers";
kmerlen = 12;
showpos = 0;
numMappers = 1;
numReducers = 1;
}
System.out.println("inpath: " + inpath);
System.out.println("outpath: " + outpath);
System.out.println("kmerlen: " + kmerlen);
System.out.println("showpos: " + showpos);
System.out.println("nummappers: " + numMappers);
System.out.println("numreducers: " + numReducers);
JobConf conf = new JobConf(MerReduce.class);
conf.setNumMapTasks(numMappers);
conf.setNumReduceTasks(numReducers);
conf.addInputPath(new Path(inpath));;
conf.set("KMER_LEN", Integer.toString(kmerlen));
conf.set("SHOW_POS", Integer.toString(showpos));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setMapOutputKeyClass(BytesWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
//conf.setCompressMapOutput(true);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setMapperClass(MerMapClass.class);
conf.setReducerClass(MerReduceClass.class);
Path oPath = new Path(outpath);
conf.setOutputPath(oPath);
System.err.println(" Removing old results");
FileSystem.get(conf).delete(oPath);
conf.setJobName("CountMers");
Timer t = new Timer();
RunningJob rj = JobClient.runJob(conf);
System.err.println("CountMers Finished");
System.err.println("Total Running time was " + t.get());
Counters counters = rj.getCounters( );
Counters.Group task = counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
long numDistinctMers = task.getCounter("REDUCE_INPUT_GROUPS");
System.err.println("Num Distinct Mers: " + numDistinctMers);
}
private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program, Path inputPath, Path outputPath,
int numMaps, int numReduces, String[] expectedResults
) throws IOException {
Path wordExec = new Path("/testing/bin/application");
JobConf job = mr.createJobConf();
job.setNumMapTasks(numMaps);
job.setNumReduceTasks(numReduces);
{
FileSystem fs = dfs.getFileSystem();
fs.delete(wordExec.getParent(), true);
fs.copyFromLocalFile(program, wordExec);
Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
Submitter.setIsJavaRecordReader(job, true);
Submitter.setIsJavaRecordWriter(job, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
RunningJob rJob = null;
if (numReduces == 0) {
rJob = Submitter.jobSubmit(job);
while (!rJob.isComplete()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
} else {
rJob = Submitter.runJob(job);
}
assertTrue("pipes job failed", rJob.isSuccessful());
Counters counters = rJob.getCounters();
Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
int numCounters = 0;
for (Counter c : wordCountCounters) {
System.out.println(c);
++numCounters;
}
assertTrue("No counters found!", (numCounters > 0));
}
List<String> results = new ArrayList<String>();
for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
new OutputLogFilter()))) {
results.add(TestMiniMRWithDFS.readOutput(p, job));
}
assertEquals("number of reduces is wrong",
expectedResults.length, results.size());
for(int i=0; i < results.size(); i++) {
assertEquals("pipes program " + program + " output " + i + " wrong",
expectedResults[i], results.get(i));
}
}