下面列出了org.apache.hadoop.mapreduce.Counter#getValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static JhCounters toAvro(Counters counters, String name) {
JhCounters result = new JhCounters();
result.name = new Utf8(name);
result.groups = new ArrayList<JhCounterGroup>(0);
if (counters == null) return result;
for (CounterGroup group : counters) {
JhCounterGroup g = new JhCounterGroup();
g.name = new Utf8(group.getName());
g.displayName = new Utf8(group.getDisplayName());
g.counts = new ArrayList<JhCounter>(group.size());
for (Counter counter : group) {
JhCounter c = new JhCounter();
c.name = new Utf8(counter.getName());
c.displayName = new Utf8(counter.getDisplayName());
c.value = counter.getValue();
g.counts.add(c);
}
result.groups.add(g);
}
return result;
}
PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
_partitionedInputData = partitionedInputData;
_counters = counters;
_rowIdsFromNewData = new long[shards];
_rowIdsToUpdateFromNewData = new long[shards];
_rowIdsFromIndex = new long[shards];
for (TaskReport tr : taskReports) {
int id = tr.getTaskID().getId();
Counters taskCounters = tr.getTaskCounters();
Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
_rowIdsFromNewData[id] = total.getValue();
Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
_rowIdsToUpdateFromNewData[id] = update.getValue();
Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
_rowIdsFromIndex[id] = index.getValue();
}
}
static JhCounters toAvro(Counters counters, String name) {
JhCounters result = new JhCounters();
result.name = new Utf8(name);
result.groups = new ArrayList<JhCounterGroup>(0);
if (counters == null) return result;
for (CounterGroup group : counters) {
JhCounterGroup g = new JhCounterGroup();
g.name = new Utf8(group.getName());
g.displayName = new Utf8(group.getDisplayName());
g.counts = new ArrayList<JhCounter>(group.size());
for (Counter counter : group) {
JhCounter c = new JhCounter();
c.name = new Utf8(counter.getName());
c.displayName = new Utf8(counter.getDisplayName());
c.value = counter.getValue();
g.counts.add(c);
}
result.groups.add(g);
}
return result;
}
/**
* Verify the values in the Counters against the expected number of entries written.
*
* @param expectedReferenced
* Expected number of referenced entrires
* @param counters
* The Job's Counters object
* @return True if the values match what's expected, false otherwise
*/
protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
final Counter referenced = counters.findCounter(Counts.REFERENCED);
final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
boolean success = true;
if (expectedReferenced != referenced.getValue()) {
LOG.error("Expected referenced count does not match with actual referenced count. " +
"expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
success = false;
}
if (unreferenced.getValue() > 0) {
final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
+ (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
success = false;
}
return success;
}
/**
* Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
*
* @param counters
* The Job's counters
* @return True if the "bad" counter objects are 0, false otherwise
*/
protected boolean verifyUnexpectedValues(Counters counters) {
final Counter undefined = counters.findCounter(Counts.UNDEFINED);
final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
boolean success = true;
if (undefined.getValue() > 0) {
LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
success = false;
}
if (lostfamilies.getValue() > 0) {
LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
success = false;
}
return success;
}
private long writeCounterGroup(Context context, CounterGroup group, String row, String cf) throws IOException, InterruptedException {
long counterTotal = 0;
for (Counter c : group) {
String counterName = c.getName();
String count = Long.toString(c.getValue());
context.write(makeKey(row, cf, counterName), makeValue(count));
counterTotal += c.getValue();
}
return counterTotal;
}
@Override
public void map(Text flagFile, Counters counters, Context context) throws IOException, InterruptedException {
System.out.println("Received counters for job " + flagFile);
log.info(counters);
long endTime = counters.findCounter(InputFile.FLAGMAKER_END_TIME).getValue();
long startTime = counters.findCounter(InputFile.FLAGMAKER_START_TIME).getValue();
Mutation statsPersist = new Mutation("flagFile\u0000" + flagFile);
statsPersist.put("", "", new Value(serializeCounters(counters)));
context.write(null, statsPersist);
// Breaking it down into individual counters... Can't get individual stats when batch-processing in the FlagMaker
for (Counter c : counters.getGroup(InputFile.class.getSimpleName())) {
Text outFile = new Text(c.getName());
Mutation m = new Mutation(outFile);
long fileTime = c.getValue();
try {
Counters cs = new Counters();
cs.findCounter(InputFile.class.getSimpleName(), outFile.toString()).setValue(c.getValue());
cs.findCounter(FlagFile.class.getSimpleName(), flagFile.toString()).increment(1);
cs.findCounter(InputFile.FLAGMAKER_END_TIME).setValue(endTime);
cs.findCounter(InputFile.FLAGMAKER_START_TIME).setValue(startTime);
m.put(WritableUtil.getLong(endTime - fileTime), WritableUtil.getLong(endTime), new Value(serializeCounters(cs)));
context.write(null, m);
} catch (IOException e) {
log.error("Could not add counters to mutation!!!", e);
}
}
}
public long putIfAbsent(final Enum<?> cName, final long value) {
final Counter counter = getCounter(cName);
final long origValue = counter.getValue();
if (origValue == 0)
counter.increment(value);
return origValue;
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
/**
* Make the pre 0.21 counter string (for e.g. old job history files)
* [(actual-name)(display-name)(value)]
* @param counter to stringify
* @return the stringified result
*/
public static String toEscapedCompactString(Counter counter) {
// First up, obtain the strings that need escaping. This will help us
// determine the buffer length apriori.
String escapedName, escapedDispName;
long currentValue;
synchronized(counter) {
escapedName = escape(counter.getName());
escapedDispName = escape(counter.getDisplayName());
currentValue = counter.getValue();
}
int length = escapedName.length() + escapedDispName.length() + 4;
length += 8; // For the following delimiting characters
StringBuilder builder = new StringBuilder(length);
builder.append(COUNTER_OPEN);
// Add the counter name
builder.append(UNIT_OPEN);
builder.append(escapedName);
builder.append(UNIT_CLOSE);
// Add the display name
builder.append(UNIT_OPEN);
builder.append(escapedDispName);
builder.append(UNIT_CLOSE);
// Add the value
builder.append(UNIT_OPEN);
builder.append(currentValue);
builder.append(UNIT_CLOSE);
builder.append(COUNTER_CLOSE);
return builder.toString();
}
public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo);
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
public static long run(String collection, Path input, Path output,
Configuration baseConf) throws IOException, ClassNotFoundException,
InterruptedException {
Configuration conf = new Configuration(baseConf);
Job job = Job.getInstance(conf);
job.setJarByClass(OnlineFeatureDriver.class);
job.setJobName("GROUP each record's feature BY identifier");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OnlineVectorWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ListWritable.class);
job.setMapperClass(OnlineFeatureMapper.class);
job.setReducerClass(OnlineFeatureReducer.class);
HadoopUtil.delete(conf, output);
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job:Group feature, Failed!");
}
Counter counter = job.getCounters().findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"REDUCE_OUTPUT_RECORDS");
long reduceOutputRecords = counter.getValue();
LOG.info(
"Job: GROUP each record's feature BY identifier, output recordes = {}",
reduceOutputRecords);
return reduceOutputRecords;
}
private static long getRecordCount(Optional<Job> job) {
if (!job.isPresent()) {
return -1l;
}
Counters counters = null;
try {
counters = job.get().getCounters();
} catch (IOException e) {
LOG.debug("Failed to get job counters. Record count will not be set. ", e);
return -1l;
}
Counter recordCounter = counters.findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
}
recordCounter = counters.findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
if (recordCounter != null && recordCounter.getValue() != 0) {
return recordCounter.getValue();
}
LOG.debug("Non zero record count not found in both mapper and reducer counters");
return -1l;
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
Configuration configuration = job.getConfiguration();
job.setJarByClass(Aegisthus.class);
CommandLine cl = getOptions(args);
if (cl == null) {
return 1;
}
// Check all of the paths and load the sstable version from the input filenames
List<Path> paths = Lists.newArrayList();
if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
paths.add(new Path(input));
}
}
if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
}
LOG.info("Processing paths: {}", paths);
// At this point we have the version of sstable that we can use for this run
Descriptor.Version version = Descriptor.Version.CURRENT;
if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
}
configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());
if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
setConfigurationFromCql(configuration);
}
if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
job.setInputFormatClass(AegisthusCombinedInputFormat.class);
} else {
job.setInputFormatClass(AegisthusInputFormat.class);
}
job.setMapOutputKeyClass(AegisthusKey.class);
job.setMapOutputValueClass(AtomWritable.class);
job.setOutputKeyClass(AegisthusKey.class);
job.setOutputValueClass(RowWritable.class);
job.setMapperClass(AegisthusKeyMapper.class);
job.setReducerClass(CassSSTableReducer.class);
job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
job.setPartitionerClass(AegisthusKeyPartitioner.class);
job.setSortComparatorClass(AegisthusKeySortingComparator.class);
TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
job.setOutputFormatClass(SSTableOutputFormat.class);
} else {
job.setOutputFormatClass(JsonOutputFormat.class);
}
CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));
job.submit();
if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new JobKiller(job));
}
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
boolean success = job.waitForCompletion(true);
if (success) {
Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
success = false;
} else if (errorCount > 0) {
LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
errorCount, maxAllowed);
}
}
return success ? 0 : 1;
}
public CounterInfo(Counter c, Counter mc, Counter rc) {
this.name = c.getName();
this.totalCounterValue = c.getValue();
this.mapCounterValue = mc == null ? 0 : mc.getValue();
this.reduceCounterValue = rc == null ? 0 : rc.getValue();
}
/**
* This method is responsible for populating the reduce phase details.
* @return TaskOutputDetails contains the details of the reduce phase.
*/
private TaskOutputDetails addReducePhaseDetails(
Entry<TaskAttemptID, TaskAttemptInfo> task, long referencedZeroTime) {
TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue());
TaskOutputDetails taskOutputDetails = new TaskOutputDetails();
if(taskAttemptInfo.getTaskStatus().equalsIgnoreCase("SUCCEEDED")){
taskOutputDetails.setTaskStatus(taskAttemptInfo.getTaskStatus());
taskOutputDetails.setTaskType(taskAttemptInfo.getTaskType().toString());
taskOutputDetails.setTaskID(taskAttemptInfo.getAttemptId().getTaskID().toString());
taskOutputDetails.setLocation(taskAttemptInfo.getHostname());
Counters counters = taskAttemptInfo.getCounters();
CounterGroup mapReduceTaskCounters = counters.getGroup("org.apache.hadoop.mapreduce.TaskCounter");
Counter reduceOutputRecords = mapReduceTaskCounters.findCounter("REDUCE_OUTPUT_RECORDS");
taskOutputDetails.setOutputRecords(reduceOutputRecords.getValue());
Counter reduceOutputBytes = mapReduceTaskCounters.findCounter("SPILLED_RECORDS");
taskOutputDetails.setOutputBytes(reduceOutputBytes.getValue());
long shuffleStartTime = (taskAttemptInfo.getStartTime()- referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setStartPoint(shuffleStartTime);
taskOutputDetails.setShuffleStart(shuffleStartTime);
LOGGER.debug("shuffle start time" + taskOutputDetails.getShuffleStart());
long shuffleEnd = ((taskAttemptInfo.getShuffleFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
taskOutputDetails.setShuffleEnd(shuffleEnd);
LOGGER.debug("shuffle end time" + taskOutputDetails.getShuffleEnd());
taskOutputDetails.setSortStart(shuffleEnd);
long sortEnd = (taskAttemptInfo.getSortFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
taskOutputDetails.setSortEnd(sortEnd);
LOGGER.debug("sort end time" + taskOutputDetails.getSortEnd());
taskOutputDetails.setReduceStart(sortEnd);
taskOutputDetails.setReduceEnd((taskAttemptInfo.getFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
taskOutputDetails.setEndPoint(taskOutputDetails.getReduceEnd());
LOGGER.debug("Reduce end time" + taskOutputDetails.getReduceEnd());
long dataFlowRate = reduceOutputBytes.getValue() / (taskOutputDetails.getReduceEnd()-shuffleStartTime);
taskOutputDetails.setDataFlowRate(dataFlowRate);
Counter physicalMemoryBytes = mapReduceTaskCounters.findCounter("PHYSICAL_MEMORY_BYTES");
ResourceUsageMetrics rum = new ResourceUsageMetrics();
rum.setPhysicalMemoryUsage(physicalMemoryBytes.getValue());
taskOutputDetails.setResourceUsageMetrics(rum);
}
return taskOutputDetails;
}
public Job run() throws Exception {
Job job = Job.getInstance(getConf());
job.setJobName(name);
job.setJarByClass(AggregationPhaseJob.class);
FileSystem fs = FileSystem.get(getConf());
Configuration configuration = job.getConfiguration();
// Properties
LOGGER.info("Properties {}", props);
// Input Path
String inputPathDir = getAndSetConfiguration(configuration, AGG_PHASE_INPUT_PATH);
LOGGER.info("Input path dir: " + inputPathDir);
for (String inputPath : inputPathDir.split(ThirdEyeConstants.FIELD_SEPARATOR)) {
LOGGER.info("Adding input:" + inputPath);
Path input = new Path(inputPath);
FileInputFormat.addInputPath(job, input);
}
// Output path
Path outputPath = new Path(getAndSetConfiguration(configuration, AGG_PHASE_OUTPUT_PATH));
LOGGER.info("Output path dir: " + outputPath.toString());
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
// Schema
Schema avroSchema = ThirdeyeAvroUtils.getSchema(inputPathDir);
LOGGER.info("Schema : {}", avroSchema.toString(true));
job.getConfiguration().set(AGG_PHASE_AVRO_SCHEMA.toString(), avroSchema.toString());
// ThirdEyeConfig
String dimensionTypesProperty = ThirdeyeAvroUtils.getDimensionTypesProperty(
props.getProperty(ThirdEyeConfigProperties.THIRDEYE_DIMENSION_NAMES.toString()), avroSchema);
props.setProperty(ThirdEyeConfigProperties.THIRDEYE_DIMENSION_TYPES.toString(), dimensionTypesProperty);
String metricTypesProperty = ThirdeyeAvroUtils.getMetricTypesProperty(
props.getProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_NAMES.toString()),
props.getProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_TYPES.toString()), avroSchema);
props.setProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_TYPES.toString(), metricTypesProperty);
ThirdEyeConfig thirdeyeConfig = ThirdEyeConfig.fromProperties(props);
LOGGER.info("Thirdeye Config {}", thirdeyeConfig.encode());
job.getConfiguration().set(AGG_PHASE_THIRDEYE_CONFIG.toString(), OBJECT_MAPPER.writeValueAsString(thirdeyeConfig));
// Map config
job.setMapperClass(AggregationMapper.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapOutputKeyClass(BytesWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
// Reduce config
job.setReducerClass(AggregationReducer.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
AvroJob.setOutputKeySchema(job, avroSchema);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
String numReducers = props.getProperty(ThirdEyeJobProperties.THIRDEYE_NUM_REDUCERS.getName());
LOGGER.info("Num Reducers : {}", numReducers);
if (StringUtils.isNotBlank(numReducers)) {
job.setNumReduceTasks(Integer.valueOf(numReducers));
LOGGER.info("Setting num reducers {}", job.getNumReduceTasks());
}
job.waitForCompletion(true);
Counter counter = job.getCounters().findCounter(AggregationCounter.NUMBER_OF_RECORDS);
LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());
if (counter.getValue() == 0) {
throw new IllegalStateException("No input records in " + inputPathDir);
}
counter = job.getCounters().findCounter(AggregationCounter.NUMBER_OF_RECORDS_FLATTENED);
LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());
for (String metric : thirdeyeConfig.getMetricNames()) {
counter = job.getCounters().findCounter(thirdeyeConfig.getCollection(), metric);
LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());
}
return job;
}