org.apache.hadoop.mapreduce.Counter#getValue ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Counter#getValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: EventWriter.java
static JhCounters toAvro(Counters counters, String name) {
  JhCounters result = new JhCounters();
  result.name = new Utf8(name);
  result.groups = new ArrayList<JhCounterGroup>(0);
  if (counters == null) return result;
  for (CounterGroup group : counters) {
    JhCounterGroup g = new JhCounterGroup();
    g.name = new Utf8(group.getName());
    g.displayName = new Utf8(group.getDisplayName());
    g.counts = new ArrayList<JhCounter>(group.size());
    for (Counter counter : group) {
      JhCounter c = new JhCounter();
      c.name = new Utf8(counter.getName());
      c.displayName = new Utf8(counter.getDisplayName());
      c.value = counter.getValue();
      g.counts.add(c);
    }
    result.groups.add(g);
  }
  return result;
}
 
源代码2 项目: incubator-retired-blur   文件: IndexerJobDriver.java
PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
  _partitionedInputData = partitionedInputData;
  _counters = counters;
  _rowIdsFromNewData = new long[shards];
  _rowIdsToUpdateFromNewData = new long[shards];
  _rowIdsFromIndex = new long[shards];
  for (TaskReport tr : taskReports) {
    int id = tr.getTaskID().getId();
    Counters taskCounters = tr.getTaskCounters();
    Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
    _rowIdsFromNewData[id] = total.getValue();
    Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
    _rowIdsToUpdateFromNewData[id] = update.getValue();
    Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
    _rowIdsFromIndex[id] = index.getValue();
  }
}
 
源代码3 项目: big-c   文件: EventWriter.java
static JhCounters toAvro(Counters counters, String name) {
  JhCounters result = new JhCounters();
  result.name = new Utf8(name);
  result.groups = new ArrayList<JhCounterGroup>(0);
  if (counters == null) return result;
  for (CounterGroup group : counters) {
    JhCounterGroup g = new JhCounterGroup();
    g.name = new Utf8(group.getName());
    g.displayName = new Utf8(group.getDisplayName());
    g.counts = new ArrayList<JhCounter>(group.size());
    for (Counter counter : group) {
      JhCounter c = new JhCounter();
      c.name = new Utf8(counter.getName());
      c.displayName = new Utf8(counter.getDisplayName());
      c.value = counter.getValue();
      g.counts.add(c);
    }
    result.groups.add(g);
  }
  return result;
}
 
源代码4 项目: hbase   文件: IntegrationTestBigLinkedList.java
/**
 * Verify the values in the Counters against the expected number of entries written.
 *
 * @param expectedReferenced
 *          Expected number of referenced entrires
 * @param counters
 *          The Job's Counters object
 * @return True if the values match what's expected, false otherwise
 */
protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
  final Counter referenced = counters.findCounter(Counts.REFERENCED);
  final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
  boolean success = true;

  if (expectedReferenced != referenced.getValue()) {
    LOG.error("Expected referenced count does not match with actual referenced count. " +
        "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
    success = false;
  }

  if (unreferenced.getValue() > 0) {
    final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
    boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
    LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
        + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
    success = false;
  }

  return success;
}
 
源代码5 项目: hbase   文件: IntegrationTestBigLinkedList.java
/**
 * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
 *
 * @param counters
 *          The Job's counters
 * @return True if the "bad" counter objects are 0, false otherwise
 */
protected boolean verifyUnexpectedValues(Counters counters) {
  final Counter undefined = counters.findCounter(Counts.UNDEFINED);
  final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
  boolean success = true;

  if (undefined.getValue() > 0) {
    LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
    success = false;
  }

  if (lostfamilies.getValue() > 0) {
    LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
    success = false;
  }

  return success;
}
 
源代码6 项目: datawave   文件: IngestMetricsSummaryLoader.java
private long writeCounterGroup(Context context, CounterGroup group, String row, String cf) throws IOException, InterruptedException {
    long counterTotal = 0;
    for (Counter c : group) {
        String counterName = c.getName();
        String count = Long.toString(c.getValue());
        context.write(makeKey(row, cf, counterName), makeValue(count));
        counterTotal += c.getValue();
    }
    return counterTotal;
}
 
源代码7 项目: datawave   文件: FlagMakerMetricsMapper.java
@Override
public void map(Text flagFile, Counters counters, Context context) throws IOException, InterruptedException {
    System.out.println("Received counters for job " + flagFile);
    
    log.info(counters);
    
    long endTime = counters.findCounter(InputFile.FLAGMAKER_END_TIME).getValue();
    long startTime = counters.findCounter(InputFile.FLAGMAKER_START_TIME).getValue();
    
    Mutation statsPersist = new Mutation("flagFile\u0000" + flagFile);
    statsPersist.put("", "", new Value(serializeCounters(counters)));
    context.write(null, statsPersist);
    
    // Breaking it down into individual counters... Can't get individual stats when batch-processing in the FlagMaker
    for (Counter c : counters.getGroup(InputFile.class.getSimpleName())) {
        Text outFile = new Text(c.getName());
        Mutation m = new Mutation(outFile);
        long fileTime = c.getValue();
        try {
            Counters cs = new Counters();
            cs.findCounter(InputFile.class.getSimpleName(), outFile.toString()).setValue(c.getValue());
            cs.findCounter(FlagFile.class.getSimpleName(), flagFile.toString()).increment(1);
            cs.findCounter(InputFile.FLAGMAKER_END_TIME).setValue(endTime);
            cs.findCounter(InputFile.FLAGMAKER_START_TIME).setValue(startTime);
            m.put(WritableUtil.getLong(endTime - fileTime), WritableUtil.getLong(endTime), new Value(serializeCounters(cs)));
            context.write(null, m);
        } catch (IOException e) {
            log.error("Could not add counters to mutation!!!", e);
        }
    }
}
 
源代码8 项目: datawave   文件: StandaloneTaskAttemptContext.java
public long putIfAbsent(final Enum<?> cName, final long value) {
    final Counter counter = getCounter(cName);
    final long origValue = counter.getValue();
    
    if (origValue == 0)
        counter.increment(value);
    return origValue;
}
 
源代码9 项目: hadoop   文件: TaskAttemptImpl.java
private void updateProgressSplits() {
  double newProgress = reportedStatus.progress;
  newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
  Counters counters = reportedStatus.counters;
  if (counters == null)
    return;

  WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
  if (splitsBlock != null) {
    long now = clock.getTime();
    long start = getLaunchTime(); // TODO Ensure not 0

    if (start != 0 && now - start <= Integer.MAX_VALUE) {
      splitsBlock.getProgressWallclockTime().extend(newProgress,
          (int) (now - start));
    }

    Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
    if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
      splitsBlock.getProgressCPUTime().extend(newProgress,
          (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
    }

    Counter virtualBytes = counters
      .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
    if (virtualBytes != null) {
      splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
          (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
    }

    Counter physicalBytes = counters
      .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
    if (physicalBytes != null) {
      splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
          (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
    }
  }
}
 
源代码10 项目: hadoop   文件: AbstractCounter.java
@Override
public synchronized boolean equals(Object genericRight) {
  if (genericRight instanceof Counter) {
    synchronized (genericRight) {
      Counter right = (Counter) genericRight;
      return getName().equals(right.getName()) &&
             getDisplayName().equals(right.getDisplayName()) &&
             getValue() == right.getValue();
    }
  }
  return false;
}
 
源代码11 项目: hadoop   文件: CountersStrings.java
/**
 * Make the pre 0.21 counter string (for e.g. old job history files)
 * [(actual-name)(display-name)(value)]
 * @param counter to stringify
 * @return the stringified result
 */
public static String toEscapedCompactString(Counter counter) {

  // First up, obtain the strings that need escaping. This will help us
  // determine the buffer length apriori.
  String escapedName, escapedDispName;
  long currentValue;
  synchronized(counter) {
    escapedName = escape(counter.getName());
    escapedDispName = escape(counter.getDisplayName());
    currentValue = counter.getValue();
  }
  int length = escapedName.length() + escapedDispName.length() + 4;


  length += 8; // For the following delimiting characters
  StringBuilder builder = new StringBuilder(length);
  builder.append(COUNTER_OPEN);

  // Add the counter name
  builder.append(UNIT_OPEN);
  builder.append(escapedName);
  builder.append(UNIT_CLOSE);

  // Add the display name
  builder.append(UNIT_OPEN);
  builder.append(escapedDispName);
  builder.append(UNIT_CLOSE);

  // Add the value
  builder.append(UNIT_OPEN);
  builder.append(currentValue);
  builder.append(UNIT_CLOSE);

  builder.append(COUNTER_CLOSE);

  return builder.toString();
}
 
源代码12 项目: big-c   文件: TaskCounterGroupInfo.java
public TaskCounterGroupInfo(String name, CounterGroup group) {
  this.counterGroupName = name;
  this.counter = new ArrayList<TaskCounterInfo>();

  for (Counter c : group) {
    TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
    this.counter.add(cinfo);
  }
}
 
源代码13 项目: big-c   文件: TaskAttemptImpl.java
private void updateProgressSplits() {
  double newProgress = reportedStatus.progress;
  newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
  Counters counters = reportedStatus.counters;
  if (counters == null)
    return;

  WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
  if (splitsBlock != null) {
    long now = clock.getTime();
    long start = getLaunchTime(); // TODO Ensure not 0

    if (start != 0 && now - start <= Integer.MAX_VALUE) {
      splitsBlock.getProgressWallclockTime().extend(newProgress,
          (int) (now - start));
    }

    Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
    if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
      splitsBlock.getProgressCPUTime().extend(newProgress,
          (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
    }

    Counter virtualBytes = counters
      .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
    if (virtualBytes != null) {
      splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
          (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
    }

    Counter physicalBytes = counters
      .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
    if (physicalBytes != null) {
      splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
          (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
    }
  }
}
 
源代码14 项目: big-c   文件: AbstractCounter.java
@Override
public synchronized boolean equals(Object genericRight) {
  if (genericRight instanceof Counter) {
    synchronized (genericRight) {
      Counter right = (Counter) genericRight;
      return getName().equals(right.getName()) &&
             getDisplayName().equals(right.getDisplayName()) &&
             getValue() == right.getValue();
    }
  }
  return false;
}
 
源代码15 项目: laser   文件: OnlineFeatureDriver.java
public static long run(String collection, Path input, Path output,
		Configuration baseConf) throws IOException, ClassNotFoundException,
		InterruptedException {
	Configuration conf = new Configuration(baseConf);
	Job job = Job.getInstance(conf);

	job.setJarByClass(OnlineFeatureDriver.class);
	job.setJobName("GROUP each record's feature BY identifier");

	FileInputFormat.setInputPaths(job, input);
	FileOutputFormat.setOutputPath(job, output);

	job.setInputFormatClass(SequenceFileInputFormat.class);
	job.setOutputFormatClass(SequenceFileOutputFormat.class);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(OnlineVectorWritable.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(ListWritable.class);

	job.setMapperClass(OnlineFeatureMapper.class);
	job.setReducerClass(OnlineFeatureReducer.class);

	HadoopUtil.delete(conf, output);
	boolean succeeded = job.waitForCompletion(true);
	if (!succeeded) {
		throw new IllegalStateException("Job:Group feature,  Failed!");
	}
	Counter counter = job.getCounters().findCounter(
			"org.apache.hadoop.mapred.Task$Counter",
			"REDUCE_OUTPUT_RECORDS");
	long reduceOutputRecords = counter.getValue();

	LOG.info(
			"Job: GROUP each record's feature BY identifier, output recordes = {}",
			reduceOutputRecords);

	return reduceOutputRecords;
}
 
private static long getRecordCount(Optional<Job> job) {

    if (!job.isPresent()) {
      return -1l;
    }

    Counters counters = null;
    try {
      counters = job.get().getCounters();
    } catch (IOException e) {
      LOG.debug("Failed to get job counters. Record count will not be set. ", e);
      return -1l;
    }

    Counter recordCounter = counters.findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);

    if (recordCounter != null && recordCounter.getValue() != 0) {
      return recordCounter.getValue();
    }

    recordCounter = counters.findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);

    if (recordCounter != null && recordCounter.getValue() != 0) {
      return recordCounter.getValue();
    }

    LOG.debug("Non zero record count not found in both mapper and reducer counters");

    return -1l;
  }
 
源代码17 项目: aegisthus   文件: Aegisthus.java
@Override
public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf());
    Configuration configuration = job.getConfiguration();

    job.setJarByClass(Aegisthus.class);
    CommandLine cl = getOptions(args);
    if (cl == null) {
        return 1;
    }

    // Check all of the paths and load the sstable version from the input filenames
    List<Path> paths = Lists.newArrayList();
    if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
        for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
            paths.add(new Path(input));
        }
    }
    if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
        paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
    }
    LOG.info("Processing paths: {}", paths);

    // At this point we have the version of sstable that we can use for this run
    Descriptor.Version version = Descriptor.Version.CURRENT;
    if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
        version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
    }
    configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());

    if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
        setConfigurationFromCql(configuration);
    }

    if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
        job.setInputFormatClass(AegisthusCombinedInputFormat.class);
    } else {
        job.setInputFormatClass(AegisthusInputFormat.class);
    }
    job.setMapOutputKeyClass(AegisthusKey.class);
    job.setMapOutputValueClass(AtomWritable.class);
    job.setOutputKeyClass(AegisthusKey.class);
    job.setOutputValueClass(RowWritable.class);
    job.setMapperClass(AegisthusKeyMapper.class);
    job.setReducerClass(CassSSTableReducer.class);
    job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
    job.setPartitionerClass(AegisthusKeyPartitioner.class);
    job.setSortComparatorClass(AegisthusKeySortingComparator.class);

    TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));

    if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
        job.setOutputFormatClass(SSTableOutputFormat.class);
    } else {
        job.setOutputFormatClass(JsonOutputFormat.class);
    }
    CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));

    job.submit();
    if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
        Runtime.getRuntime().addShutdownHook(new JobKiller(job));
    }

    System.out.println(job.getJobID());
    System.out.println(job.getTrackingURL());
    boolean success = job.waitForCompletion(true);

    if (success) {
        Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
        long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
        int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
        if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
            LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
            success = false;
        } else if (errorCount > 0) {
            LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
                    errorCount, maxAllowed);
        }
    }

    return success ? 0 : 1;
}
 
源代码18 项目: hadoop   文件: CounterInfo.java
public CounterInfo(Counter c, Counter mc, Counter rc) {
  this.name = c.getName();
  this.totalCounterValue = c.getValue();
  this.mapCounterValue = mc == null ? 0 : mc.getValue();
  this.reduceCounterValue = rc == null ? 0 : rc.getValue();
}
 
源代码19 项目: jumbune   文件: YarnJobStatsUtility.java
/**
 * This method is responsible for populating the reduce phase details.
 * @return TaskOutputDetails contains the details of the reduce phase.
 */
private TaskOutputDetails addReducePhaseDetails(
		Entry<TaskAttemptID, TaskAttemptInfo> task, long referencedZeroTime) {
	
	
	TaskAttemptInfo taskAttemptInfo = (TaskAttemptInfo) (task.getValue());
	TaskOutputDetails taskOutputDetails = new TaskOutputDetails();
	if(taskAttemptInfo.getTaskStatus().equalsIgnoreCase("SUCCEEDED")){
	taskOutputDetails.setTaskStatus(taskAttemptInfo.getTaskStatus());
	taskOutputDetails.setTaskType(taskAttemptInfo.getTaskType().toString());
	taskOutputDetails.setTaskID(taskAttemptInfo.getAttemptId().getTaskID().toString());
	taskOutputDetails.setLocation(taskAttemptInfo.getHostname());
	Counters counters = taskAttemptInfo.getCounters();
	CounterGroup mapReduceTaskCounters = counters.getGroup("org.apache.hadoop.mapreduce.TaskCounter");
	Counter reduceOutputRecords = mapReduceTaskCounters.findCounter("REDUCE_OUTPUT_RECORDS");
	taskOutputDetails.setOutputRecords(reduceOutputRecords.getValue());
	Counter reduceOutputBytes = mapReduceTaskCounters.findCounter("SPILLED_RECORDS");
	taskOutputDetails.setOutputBytes(reduceOutputBytes.getValue());
	long shuffleStartTime = (taskAttemptInfo.getStartTime()- referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
	taskOutputDetails.setStartPoint(shuffleStartTime);
	taskOutputDetails.setShuffleStart(shuffleStartTime);
	LOGGER.debug("shuffle start time" + taskOutputDetails.getShuffleStart());
	long shuffleEnd = ((taskAttemptInfo.getShuffleFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
	taskOutputDetails.setShuffleEnd(shuffleEnd);
	LOGGER.debug("shuffle end time" + taskOutputDetails.getShuffleEnd());
	taskOutputDetails.setSortStart(shuffleEnd);
	long sortEnd = (taskAttemptInfo.getSortFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS;
	taskOutputDetails.setSortEnd(sortEnd);
	
	LOGGER.debug("sort end time" + taskOutputDetails.getSortEnd());
	taskOutputDetails.setReduceStart(sortEnd);
	taskOutputDetails.setReduceEnd((taskAttemptInfo.getFinishTime()-referencedZeroTime)/CONVERSION_FACTOR_MILLISECS_TO_SECS);
	taskOutputDetails.setEndPoint(taskOutputDetails.getReduceEnd());
	LOGGER.debug("Reduce end time" + taskOutputDetails.getReduceEnd());
	long dataFlowRate = reduceOutputBytes.getValue() / (taskOutputDetails.getReduceEnd()-shuffleStartTime);
	taskOutputDetails.setDataFlowRate(dataFlowRate);
	
	Counter physicalMemoryBytes = mapReduceTaskCounters.findCounter("PHYSICAL_MEMORY_BYTES");		
	ResourceUsageMetrics rum = new ResourceUsageMetrics();
	rum.setPhysicalMemoryUsage(physicalMemoryBytes.getValue());
	taskOutputDetails.setResourceUsageMetrics(rum);
	}
	return taskOutputDetails;
	

}
 
源代码20 项目: incubator-pinot   文件: AggregationPhaseJob.java
public Job run() throws Exception {
  Job job = Job.getInstance(getConf());
  job.setJobName(name);
  job.setJarByClass(AggregationPhaseJob.class);

  FileSystem fs = FileSystem.get(getConf());
  Configuration configuration = job.getConfiguration();

  // Properties
  LOGGER.info("Properties {}", props);

   // Input Path
  String inputPathDir = getAndSetConfiguration(configuration, AGG_PHASE_INPUT_PATH);
  LOGGER.info("Input path dir: " + inputPathDir);
  for (String inputPath : inputPathDir.split(ThirdEyeConstants.FIELD_SEPARATOR)) {
    LOGGER.info("Adding input:" + inputPath);
    Path input = new Path(inputPath);
    FileInputFormat.addInputPath(job, input);
  }

  // Output path
  Path outputPath = new Path(getAndSetConfiguration(configuration, AGG_PHASE_OUTPUT_PATH));
  LOGGER.info("Output path dir: " + outputPath.toString());
  if (fs.exists(outputPath)) {
    fs.delete(outputPath, true);
  }
  FileOutputFormat.setOutputPath(job, outputPath);

  // Schema
  Schema avroSchema = ThirdeyeAvroUtils.getSchema(inputPathDir);
  LOGGER.info("Schema : {}", avroSchema.toString(true));
  job.getConfiguration().set(AGG_PHASE_AVRO_SCHEMA.toString(), avroSchema.toString());

  // ThirdEyeConfig
  String dimensionTypesProperty = ThirdeyeAvroUtils.getDimensionTypesProperty(
      props.getProperty(ThirdEyeConfigProperties.THIRDEYE_DIMENSION_NAMES.toString()), avroSchema);
  props.setProperty(ThirdEyeConfigProperties.THIRDEYE_DIMENSION_TYPES.toString(), dimensionTypesProperty);
  String metricTypesProperty = ThirdeyeAvroUtils.getMetricTypesProperty(
      props.getProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_NAMES.toString()),
      props.getProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_TYPES.toString()), avroSchema);
  props.setProperty(ThirdEyeConfigProperties.THIRDEYE_METRIC_TYPES.toString(), metricTypesProperty);
  ThirdEyeConfig thirdeyeConfig = ThirdEyeConfig.fromProperties(props);
  LOGGER.info("Thirdeye Config {}", thirdeyeConfig.encode());
  job.getConfiguration().set(AGG_PHASE_THIRDEYE_CONFIG.toString(), OBJECT_MAPPER.writeValueAsString(thirdeyeConfig));

  // Map config
  job.setMapperClass(AggregationMapper.class);
  job.setInputFormatClass(AvroKeyInputFormat.class);
  job.setMapOutputKeyClass(BytesWritable.class);
  job.setMapOutputValueClass(BytesWritable.class);

  // Reduce config
  job.setReducerClass(AggregationReducer.class);
  job.setOutputKeyClass(AvroKey.class);
  job.setOutputValueClass(NullWritable.class);
  AvroJob.setOutputKeySchema(job, avroSchema);
  job.setOutputFormatClass(AvroKeyOutputFormat.class);
  String numReducers = props.getProperty(ThirdEyeJobProperties.THIRDEYE_NUM_REDUCERS.getName());
  LOGGER.info("Num Reducers : {}", numReducers);
  if (StringUtils.isNotBlank(numReducers)) {
    job.setNumReduceTasks(Integer.valueOf(numReducers));
    LOGGER.info("Setting num reducers {}", job.getNumReduceTasks());
  }

  job.waitForCompletion(true);

  Counter counter = job.getCounters().findCounter(AggregationCounter.NUMBER_OF_RECORDS);
  LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());
  if (counter.getValue() == 0) {
    throw new IllegalStateException("No input records in " + inputPathDir);
  }
  counter = job.getCounters().findCounter(AggregationCounter.NUMBER_OF_RECORDS_FLATTENED);
  LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());

  for (String metric : thirdeyeConfig.getMetricNames()) {
    counter = job.getCounters().findCounter(thirdeyeConfig.getCollection(), metric);
    LOGGER.info(counter.getDisplayName() + " : " + counter.getValue());
  }

  return job;
}