下面列出了org.apache.hadoop.mapred.TaskUmbilicalProtocol#org.apache.hadoop.mapred.Counters.Counter 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("deprecation")
private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
assertEquals("New name and method", 1, counters.findCounter("file",
FileSystemCounter.BYTES_READ).getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"FileSystemCounters",
"FILE_BYTES_READ").getValue());
}
@Test
public void testFileSystemGroupIteratorConcurrency() {
Counters counters = new Counters();
// create 2 filesystem counter groups
counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1);
counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1);
// Iterate over the counters in this group while updating counters in
// the group
Group group = counters.getGroup(FileSystemCounter.class.getName());
Iterator<Counter> iterator = group.iterator();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
counters.findCounter("fs3", FileSystemCounter.BYTES_READ).increment(1);
assertTrue(iterator.hasNext());
iterator.next();
}
@SuppressWarnings("rawtypes")
@Test
public void testFrameworkCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(JobCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
@SuppressWarnings("deprecation")
private void checkLegacyNames(Counters counters) {
assertEquals("New name", 1, counters.findCounter(
TaskCounter.class.getName(), "MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).getValue());
assertEquals("New name", 1, counters.findCounter(
JobCounter.class.getName(), "DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"org.apache.hadoop.mapred.JobInProgress$Counter",
"DATA_LOCAL_MAPS").getValue());
assertEquals("Legacy enum", 1,
counters.findCounter(JobInProgress.Counter.DATA_LOCAL_MAPS).getValue());
assertEquals("New name", 1, counters.findCounter(
FileSystemCounter.class.getName(), "FILE_BYTES_READ").getValue());
assertEquals("New name and method", 1, counters.findCounter("file",
FileSystemCounter.BYTES_READ).getValue());
assertEquals("Legacy name", 1, counters.findCounter(
"FileSystemCounters",
"FILE_BYTES_READ").getValue());
}
public Task(String jobFile, TaskAttemptID taskId, int partition) {
this.jobFile = jobFile;
this.taskId = taskId;
this.partition = partition;
this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
0.0f,
TaskStatus.State.UNASSIGNED,
"", "", "",
isMapTask() ?
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
this.mapOutputFile.setJobId(taskId.getJobID());
spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
@SuppressWarnings("rawtypes")
@Test
public void testFrameworkCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(JobCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("JobCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(JobCounter.NUM_FAILED_MAPS.toString());
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
public Counters.Counter getCounter(String group, String name) {
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
if (counter == null) {
Group grp = counters.addGroup(group, group);
counter = grp.addCounter(name, name, 10);
}
}
return counter;
}
/**
* Verify counter value works
*/
@SuppressWarnings("deprecation")
@Test
public void testCounterValue() {
Counters counters = new Counters();
final int NUMBER_TESTS = 100;
final int NUMBER_INC = 10;
final Random rand = new Random();
for (int i = 0; i < NUMBER_TESTS; i++) {
long initValue = rand.nextInt();
long expectedValue = initValue;
Counter counter = counters.findCounter("foo", "bar");
counter.setValue(initValue);
assertEquals("Counter value is not initialized correctly",
expectedValue, counter.getValue());
for (int j = 0; j < NUMBER_INC; j++) {
int incValue = rand.nextInt();
counter.increment(incValue);
expectedValue += incValue;
assertEquals("Counter value is not incremented correctly",
expectedValue, counter.getValue());
}
expectedValue = rand.nextInt();
counter.setValue(expectedValue);
assertEquals("Counter value is not set correctly",
expectedValue, counter.getValue());
}
}
@SuppressWarnings("deprecation")
@Test
public void testWriteWithLegacyNames() {
Counters counters = new Counters();
counters.incrCounter(Task.Counter.MAP_INPUT_RECORDS, 1);
counters.incrCounter(JobInProgress.Counter.DATA_LOCAL_MAPS, 1);
counters.findCounter("FileSystemCounters", "FILE_BYTES_READ").increment(1);
checkLegacyNames(counters);
}
@SuppressWarnings("deprecation")
@Test
public void testGroupIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Group group = counters.getGroup("group1");
Iterator<Counter> iterator = group.iterator();
counters.incrCounter("group1", "counter2", 1);
iterator.next();
}
@Test
public void testFilesystemCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
Group fsGroup = groupFactory.newFileSystemGroup();
org.apache.hadoop.mapreduce.Counter count1 =
fsGroup.findCounter("ANY_BYTES_READ");
Assert.assertNotNull(count1);
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
fsGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
private void validateCounters() throws IOException {
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
}
@SuppressWarnings("deprecation")
@Test
public void testWriteWithLegacyNames() {
Counters counters = new Counters();
counters.incrCounter(Task.Counter.MAP_INPUT_RECORDS, 1);
counters.incrCounter(JobInProgress.Counter.DATA_LOCAL_MAPS, 1);
counters.findCounter("FileSystemCounters", "FILE_BYTES_READ").increment(1);
checkLegacyNames(counters);
}
@SuppressWarnings("deprecation")
@Test
public void testGroupIteratorConcurrency() {
Counters counters = new Counters();
counters.incrCounter("group1", "counter1", 1);
Group group = counters.getGroup("group1");
Iterator<Counter> iterator = group.iterator();
counters.incrCounter("group1", "counter2", 1);
iterator.next();
}
protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
JobConf conf,
Counters.Counter inputCounter,
TaskReporter reporter) {
super(inputCounter, conf, reporter);
combinerClass = cls;
keyClass = (Class<K>) job.getMapOutputKeyClass();
valueClass = (Class<V>) job.getMapOutputValueClass();
comparator = (RawComparator<K>) job.getOutputKeyComparator();
}
private void validateCounters() throws IOException {
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
}
protected static void setCounterValue(@Nullable Counter counter, long value) {
if (counter != null) {
synchronized (counter) {
counter.setValue(value);
}
}
}
protected static void incrCounter(@Nullable Counter counter, long incr) {
if (counter != null) {
synchronized (counter) {
counter.increment(incr);
}
}
}
protected static void setCounterValue(@Nullable Counter counter, long value) {
if (counter != null) {
synchronized (counter) {
counter.setValue(value);
}
}
}
protected static void incrCounter(@Nullable Counter counter, long incr) {
if (counter != null) {
synchronized (counter) {
counter.increment(incr);
}
}
}
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer) {
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getSortComparator();
this.committer = committer;
}
protected static void incrCounter(@Nullable Counter counter, long incr) {
if (counter != null) {
synchronized (counter) {
counter.increment(incr);
}
}
}
@Override
public Counter getCounter(String group, String name)
{
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(group, name);
}
return counter;
}
@Override
public Counter getCounter(Enum<?> key)
{
Counters.Counter counter = null;
if (counters != null) {
counter = counters.findCounter(key);
}
return counter;
}
@Override
public List<HadoopCounterKeyValuePair> getAllCounters() throws IOException {
List<HadoopCounterKeyValuePair> result = new ArrayList<HadoopCounterKeyValuePair>();
for (Counters.Group group : job.getCounters()) {
for (Counter counter : group) {
result.add(new HadoopCounterKeyValuePair(counter.getName(), group.getName(), counter.getValue()));
}
}
return result;
}
@SuppressWarnings("deprecation")
private static void parseAndAddJobCounters(Map<String, String> job, String counters) {
try {
Counters counterGroups = Counters.fromEscapedCompactString(counters);
for (Group otherGroup : counterGroups) {
Group group = counterGroups.getGroup(otherGroup.getName());
for (Counter otherCounter : otherGroup) {
Counter counter = group.getCounterForName(otherCounter.getName());
job.put(otherCounter.getName(), String.valueOf(counter.getValue()));
}
}
} catch (ParseException e) {
LOG.warn("Failed to parse job counters", e);
}
}
public ReduceValuesIterator (KeyValuesReader in,
Progressable reporter,
Counter reduceInputValueCounter)
throws IOException {
this.reduceInputValueCounter = reduceInputValueCounter;
this.in = in;
this.reporter = reporter;
}
CombinerRunner(Counters.Counter inputCounter,
JobConf job,
TaskReporter reporter) {
this.inputCounter = inputCounter;
this.job = job;
this.reporter = reporter;
}
public IndexRecord spill(JobConf job, FSDataOutputStream out,
Class<K> keyClass, Class<V> valClass, CompressionCodec codec,
Counter spillCounter) throws IOException {
IFile.Writer<K, V> writer = null;
IndexRecord rec = new IndexRecord();
long segmentStart = out.getPos();
try {
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
spillCounter);
// spill directly
KeyValueSpillIterator kvSortedArray = this.getKeyValueSpillIterator();
MemoryBlockIndex memBlkIdx = kvSortedArray.next();
while (memBlkIdx != null) {
int pos = memBlkIdx.getIndex();
MemoryBlock memBlk = memBlkIdx.getMemoryBlock();
writer.append(kvbuffer, memBlk.offsets[pos], memBlk.keyLenArray[pos],
memBlk.valueLenArray[pos]);
memBlkIdx = kvSortedArray.next();
}
} finally {
// close the writer
if (null != writer)
writer.close();
}
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
writer = null;
return rec;
}
public void testCommandLine() throws IOException
{
try {
try {
OUTPUT_DIR.getAbsoluteFile().delete();
} catch (Exception e) {
}
createInput();
boolean mayExit = false;
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
StreamJob job = new StreamJob(genArgs(), mayExit);
job.go();
File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
outFile.delete();
assertEquals(outputExpect, output);
Counters counters = job.running_.getCounters();
assertNotNull("Counters", counters);
Group group = counters.getGroup("UserCounters");
assertNotNull("Group", group);
Counter counter = group.getCounterForName("InputLines");
assertNotNull("Counter", counter);
assertEquals(3, counter.getCounter());
} finally {
File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
INPUT_FILE.delete();
outFileCRC.delete();
OUTPUT_DIR.getAbsoluteFile().delete();
}
}