下面列出了org.apache.hadoop.mapreduce.Counters#findCounter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Verify the values in the Counters against the expected number of entries written.
*
* @param expectedReferenced
* Expected number of referenced entrires
* @param counters
* The Job's Counters object
* @return True if the values match what's expected, false otherwise
*/
protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
final Counter referenced = counters.findCounter(Counts.REFERENCED);
final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
boolean success = true;
if (expectedReferenced != referenced.getValue()) {
LOG.error("Expected referenced count does not match with actual referenced count. " +
"expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
success = false;
}
if (unreferenced.getValue() > 0) {
final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
+ (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
success = false;
}
return success;
}
/**
* Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
*
* @param counters
* The Job's counters
* @return True if the "bad" counter objects are 0, false otherwise
*/
protected boolean verifyUnexpectedValues(Counters counters) {
final Counter undefined = counters.findCounter(Counts.UNDEFINED);
final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
boolean success = true;
if (undefined.getValue() > 0) {
LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
success = false;
}
if (lostfamilies.getValue() > 0) {
LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
success = false;
}
return success;
}
PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
_partitionedInputData = partitionedInputData;
_counters = counters;
_rowIdsFromNewData = new long[shards];
_rowIdsToUpdateFromNewData = new long[shards];
_rowIdsFromIndex = new long[shards];
for (TaskReport tr : taskReports) {
int id = tr.getTaskID().getId();
Counters taskCounters = tr.getTaskCounters();
Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
_rowIdsFromNewData[id] = total.getValue();
Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
_rowIdsToUpdateFromNewData[id] = update.getValue();
Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
_rowIdsFromIndex[id] = index.getValue();
}
}
public static Counters fromTez(TezCounters tezCounters) {
if (tezCounters == null) {
return null;
}
Counters counters = new Counters();
for (CounterGroup xGrp : tezCounters) {
counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
for (TezCounter xCounter : xGrp) {
Counter counter =
counters.findCounter(xGrp.getName(), xCounter.getName());
counter.setValue(xCounter.getValue());
}
}
return counters;
}
public static Counters fromTez(TezCounters tezCounters) {
if (tezCounters == null) {
return null;
}
Counters counters = new Counters();
for (CounterGroup xGrp : tezCounters) {
counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
for (TezCounter xCounter : xGrp) {
Counter counter =
counters.findCounter(xGrp.getName(), xCounter.getName());
counter.setValue(xCounter.getValue());
}
}
return counters;
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}
private VespaCounters(Job job) throws IOException {
Counters counters = job.getCounters();
documentsSent = counters.findCounter(GROUP, DOCS_SENT);
documentsOk = counters.findCounter(GROUP, DOCS_OK);
documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
}
/**
* Verify scan counters are emitted from the job
* @param job
* @throws IOException
*/
private void verifyJobCountersAreEmitted(Job job) throws IOException {
Counters counters = job.getCounters();
Counter counter
= counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
}
private static long getRecordCount(Optional<Job> job) {
if (!job.isPresent()) {
return -1l;
}
Counters counters = null;
try {
counters = job.get().getCounters();
} catch (IOException e) {
LOG.debug("Failed to get job counters. Record count will not be set. ", e);
return -1l;
}
Counter recordCounter = counters.findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
}
recordCounter = counters.findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
}
LOG.debug("Non zero record count not found in both mapper and reducer counters");
return -1l;
}
@Test
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
}
};
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl baseAttempt = getLastAttempt();
// add a speculative attempt
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl specAttempt = getLastAttempt();
assertEquals(2, taskAttempts.size());
Counters specAttemptCounters = new Counters();
Counter cpuCounter = specAttemptCounters.findCounter(
TaskCounter.CPU_MILLISECONDS);
cpuCounter.setValue(1000);
specAttempt.setCounters(specAttemptCounters);
// have the spec attempt succeed but second attempt at 1.0 progress as well
commitTaskAttempt(specAttempt.getAttemptId());
specAttempt.setProgress(1.0f);
specAttempt.setState(TaskAttemptState.SUCCEEDED);
mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
baseAttempt.setProgress(1.0f);
Counters taskCounters = mockTask.getCounters();
assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
}
@Test
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
}
};
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl baseAttempt = getLastAttempt();
// add a speculative attempt
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl specAttempt = getLastAttempt();
assertEquals(2, taskAttempts.size());
Counters specAttemptCounters = new Counters();
Counter cpuCounter = specAttemptCounters.findCounter(
TaskCounter.CPU_MILLISECONDS);
cpuCounter.setValue(1000);
specAttempt.setCounters(specAttemptCounters);
// have the spec attempt succeed but second attempt at 1.0 progress as well
commitTaskAttempt(specAttempt.getAttemptId());
specAttempt.setProgress(1.0f);
specAttempt.setState(TaskAttemptState.SUCCEEDED);
mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
baseAttempt.setProgress(1.0f);
Counters taskCounters = mockTask.getCounters();
assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
}
@Override
public int runJob() throws Exception {
final boolean job1Success = (super.runJob() == 0);
Assert.assertTrue(job1Success);
// after the first job there should be a sequence file with the
// filtered results which should match the expected results
// resources
final Job job = Job.getInstance(super.getConf());
final Configuration conf = job.getConfiguration();
MapReduceTestUtils.filterConfiguration(conf);
final ByteBuffer buf = ByteBuffer.allocate((8 * expectedResults.hashedCentroids.size()) + 4);
buf.putInt(expectedResults.hashedCentroids.size());
for (final Long hashedCentroid : expectedResults.hashedCentroids) {
buf.putLong(hashedCentroid);
}
conf.set(
MapReduceTestUtils.EXPECTED_RESULTS_KEY,
ByteArrayUtils.byteArrayToString(buf.array()));
GeoWaveInputFormat.setStoreOptions(conf, dataStoreOptions);
job.setJarByClass(this.getClass());
job.setJobName("GeoWave Test (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(VerifyExpectedResultsMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, getHdfsOutputPath());
final boolean job2success = job.waitForCompletion(true);
final Counters jobCounters = job.getCounters();
final Counter expectedCnt = jobCounters.findCounter(ResultCounterType.EXPECTED);
Assert.assertNotNull(expectedCnt);
Assert.assertEquals(expectedResults.count, expectedCnt.getValue());
final Counter errorCnt = jobCounters.findCounter(ResultCounterType.ERROR);
if (errorCnt != null) {
Assert.assertEquals(0L, errorCnt.getValue());
}
final Counter unexpectedCnt = jobCounters.findCounter(ResultCounterType.UNEXPECTED);
if (unexpectedCnt != null) {
Assert.assertEquals(0L, unexpectedCnt.getValue());
}
return job2success ? 0 : 1;
}