类org.apache.hadoop.mapred.TaskID源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.TaskID的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: twister2   文件: HadoopSource.java
@Override
public boolean hasNext() {
  if (currentReader != null) {
    try {
      boolean current = currentReader.nextKeyValue();
      while (!current && consumingSplit < assignedSplits.size() - 1) {
        TaskID taskID = new TaskID(context.getId(), context.getIndex(),
            TaskType.MAP, context.getIndex());
        TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
        consumingSplit++;
        TaskAttemptContextImpl taskAttemptContext =
            new TaskAttemptContextImpl(jconf, taskAttemptID);
        currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
            taskAttemptContext);
        currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
        current = currentReader.nextKeyValue();
      }
      return current;
    } catch (IOException | InterruptedException e) {
      throw new RuntimeException("Failed to read the next key vale", e);
    }
  }
  return false;
}
 
源代码2 项目: twister2   文件: HadoopSourceWithMap.java
@Override
public boolean hasNext() {
  if (currentReader != null) {
    try {
      boolean current = currentReader.nextKeyValue();
      while (!current && consumingSplit < assignedSplits.size() - 1) {
        TaskID taskID = new TaskID(context.getId(), context.getIndex(),
            TaskType.MAP, context.getIndex());
        TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
        consumingSplit++;
        TaskAttemptContextImpl taskAttemptContext =
            new TaskAttemptContextImpl(jconf, taskAttemptID);
        currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
            taskAttemptContext);
        currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
        current = currentReader.nextKeyValue();
      }
      return current;
    } catch (IOException | InterruptedException e) {
      throw new RuntimeException("Failed to read the next key vale", e);
    }
  }
  return false;
}
 
源代码3 项目: elasticsearch-hadoop   文件: HadoopCfgUtils.java
public static TaskID getTaskID(Configuration cfg) {
    // first try with the attempt since some Hadoop versions mix the two
    String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
    if (StringUtils.hasText(taskAttemptId)) {
        try {
            return TaskAttemptID.forName(taskAttemptId).getTaskID();
        } catch (IllegalArgumentException ex) {
            // the task attempt is invalid (Tez in particular uses the wrong string - see #346)
            // try to fallback to task id
            return parseTaskIdFromTaskAttemptId(taskAttemptId);
        }
    }
    String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
    // double-check task id bug in Hadoop 2.5.x
    if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
        return TaskID.forName(taskIdProp);
    }
    return null;
}
 
源代码4 项目: elasticsearch-hadoop   文件: HadoopCfgUtils.java
private static TaskID parseTaskIdFromTaskAttemptId(String taskAttemptId) {
    // Tez in particular uses an incorrect String task1244XXX instead of task_1244 which makes the parsing fail
    // this method try to cope with such issues and look at the numbers if possible
    if (taskAttemptId.startsWith("task")) {
        taskAttemptId = taskAttemptId.substring(4);
    }
    if (taskAttemptId.startsWith("_")) {
        taskAttemptId = taskAttemptId.substring(1);
    }
    List<String> tokenize = StringUtils.tokenize(taskAttemptId, "_");
    // need at least 4 entries from 123123123123_0001_r_0000_4
    if (tokenize.size() < 4) {
        LogFactory.getLog(HadoopCfgUtils.class).warn("Cannot parse task attempt (too little arguments) " + taskAttemptId);
        return null;
    }
    // we parse straight away - in case of an exception we can catch the new format
    try {
        return new TaskID(tokenize.get(0), Integer.parseInt(tokenize.get(1)), tokenize.get(2).startsWith("m"), Integer.parseInt(tokenize.get(3)));
    } catch (Exception ex) {
        LogFactory.getLog(HadoopCfgUtils.class).warn("Cannot parse task attempt " + taskAttemptId);
        return null;
    }
}
 
源代码5 项目: twister2   文件: HadoopSource.java
@Override
public void prepare(TSetContext ctx) {
  this.context = ctx;
  Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
  jconf = new JobConf(hadoopConf);
  try {
    format = inputClazz.newInstance();
    JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
        context.getIndex()));
    List<InputSplit> splits = format.getSplits(jobContext);

    for (int i = 0; i < splits.size(); i++) {
      if (i % context.getParallelism() == context.getIndex()) {
        assignedSplits.add(splits.get(i));
      }
    }

    if (assignedSplits.size() > 0) {
      TaskID taskID = new TaskID(context.getId(), context.getIndex(),
          TaskType.MAP, context.getIndex());
      TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
      TaskAttemptContextImpl taskAttemptContext =
          new TaskAttemptContextImpl(jconf, taskAttemptID);
      currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
          taskAttemptContext);
      currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
    }
  } catch (InstantiationException | IllegalAccessException
      | InterruptedException | IOException e) {
    throw new RuntimeException("Failed to initialize hadoop input", e);
  }
}
 
源代码6 项目: twister2   文件: HadoopSourceWithMap.java
@Override
public void prepare(TSetContext ctx) {
  this.context = ctx;
  Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
  jconf = new JobConf(hadoopConf);
  try {
    format = inputClazz.newInstance();
    JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
        context.getIndex()));
    List<InputSplit> splits = format.getSplits(jobContext);

    for (int i = 0; i < splits.size(); i++) {
      if (i % context.getParallelism() == context.getIndex()) {
        assignedSplits.add(splits.get(i));
      }
    }

    if (assignedSplits.size() > 0) {
      TaskID taskID = new TaskID(context.getId(), context.getIndex(),
          TaskType.MAP, context.getIndex());
      TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
      TaskAttemptContextImpl taskAttemptContext =
          new TaskAttemptContextImpl(jconf, taskAttemptID);
      currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
          taskAttemptContext);
      currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
    }
  } catch (InstantiationException | IllegalAccessException
      | InterruptedException | IOException e) {
    throw new RuntimeException("Failed to initialize hadoop input", e);
  }
}
 
源代码7 项目: hadoop   文件: TaskReport.java
/**
 * Creates a new TaskReport object
 * @param taskid
 * @param progress
 * @param state
 * @param diagnostics
 * @param currentStatus
 * @param startTime
 * @param finishTime
 * @param counters
 */
public TaskReport(TaskID taskid, float progress, String state,
           String[] diagnostics, TIPStatus currentStatus, 
           long startTime, long finishTime,
           Counters counters) {
  this.taskid = taskid;
  this.progress = progress;
  this.state = state;
  this.diagnostics = diagnostics;
  this.currentStatus = currentStatus;
  this.startTime = startTime; 
  this.finishTime = finishTime;
  this.counters = counters;
}
 
源代码8 项目: hadoop   文件: TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
    throws IOException {
  TaskAttemptID attemptId =
      new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);

  String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
      attemptId, false);

  // trim() is called on expectedStderr here because the method
  // MapReduceTestUtil.readTaskLog() returns trimmed String.
  assertTrue(log.equals(expectedStderr.trim()));
}
 
源代码9 项目: big-c   文件: TaskReport.java
/**
 * Creates a new TaskReport object
 * @param taskid
 * @param progress
 * @param state
 * @param diagnostics
 * @param currentStatus
 * @param startTime
 * @param finishTime
 * @param counters
 */
public TaskReport(TaskID taskid, float progress, String state,
           String[] diagnostics, TIPStatus currentStatus, 
           long startTime, long finishTime,
           Counters counters) {
  this.taskid = taskid;
  this.progress = progress;
  this.state = state;
  this.diagnostics = diagnostics;
  this.currentStatus = currentStatus;
  this.startTime = startTime; 
  this.finishTime = finishTime;
  this.counters = counters;
}
 
源代码10 项目: big-c   文件: TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
    throws IOException {
  TaskAttemptID attemptId =
      new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);

  String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
      attemptId, false);

  // trim() is called on expectedStderr here because the method
  // MapReduceTestUtil.readTaskLog() returns trimmed String.
  assertTrue(log.equals(expectedStderr.trim()));
}
 
源代码11 项目: incubator-hivemall   文件: HadoopUtils.java
@Nonnull
public static String getJobIdFromTaskId(@Nonnull String taskidStr) {
    if (!taskidStr.startsWith("task_")) {// workaround for Tez
        taskidStr = taskidStr.replace("task", "task_");
        taskidStr = taskidStr.substring(0, taskidStr.lastIndexOf('_'));
    }
    TaskID taskId = TaskID.forName(taskidStr);
    JobID jobId = taskId.getJobID();
    return jobId.toString();
}
 
@Test
public void getTaskAttemptId() {
  final TaskAttemptID id = new TaskAttemptID( new TaskID(), 0 );
  org.apache.hadoop.mapred.TaskCompletionEvent delegate = new org.apache.hadoop.mapred.TaskCompletionEvent() {
    public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptId() {
      return id;
    }
  };
  TaskCompletionEventProxy proxy = new TaskCompletionEventProxy( delegate );

  assertEquals( id, proxy.getTaskAttemptId() );
}
 
源代码13 项目: incubator-tez   文件: IDConverter.java
public static TezTaskID
    fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
  return TezTaskID.getInstance(
      TezVertexID.getInstance(fromMRJobId(taskid.getJobID()),
              (taskid.getTaskType() == TaskType.MAP ? 0 : 1)
          ),
      taskid.getId());
}
 
源代码14 项目: incubator-tez   文件: MRInputBase.java
public List<Event> initialize() throws IOException {
  getContext().requestInitialMemory(0l, null); // mandatory call
  MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
      MRHelpers.parseMRInputPayload(getContext().getUserPayload());
  Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
      "Split information not expected in " + this.getClass().getName());
  Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());

  this.jobConf = new JobConf(conf);
  // Add tokens to the jobConf - in case they are accessed within the RR / IF
  jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());

  TaskAttemptID taskAttemptId = new TaskAttemptID(
      new TaskID(
          Long.toString(getContext().getApplicationId().getClusterTimestamp()),
          getContext().getApplicationId().getId(), TaskType.MAP,
          getContext().getTaskIndex()),
      getContext().getTaskAttemptNumber());

  jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
      taskAttemptId.toString());
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
      getContext().getDAGAttemptNumber());

  this.inputRecordCounter = getContext().getCounters().findCounter(
      TaskCounter.INPUT_RECORDS_PROCESSED);

  useNewApi = this.jobConf.getUseNewMapper();
  return null;
}
 
源代码15 项目: elasticsearch-hadoop   文件: HeartBeat.java
HeartBeat(final Progressable progressable, Configuration cfg, TimeValue lead, final Log log) {
    Assert.notNull(progressable, "a valid progressable is required to report status to Hadoop");
    TimeValue tv = HadoopCfgUtils.getTaskTimeout(cfg);

    Assert.isTrue(tv.getSeconds() <= 0 || tv.getSeconds() > lead.getSeconds(), "Hadoop timeout is shorter than the heartbeat");

    this.progressable = progressable;
    long cfgMillis = (tv.getMillis() > 0 ? tv.getMillis() : 0);
    // the task is simple hence the delay = timeout - lead, that is when to start the notification right before the timeout
    this.delay = new TimeValue(Math.abs(cfgMillis - lead.getMillis()), TimeUnit.MILLISECONDS);
    this.log = log;

    String taskId;
    TaskID taskID = HadoopCfgUtils.getTaskID(cfg);

    if (taskID == null) {
        log.warn("Cannot determine task id...");
        taskId = "<unknown>";
        if (log.isTraceEnabled()) {
            log.trace("Current configuration is " + HadoopCfgUtils.asProperties(cfg));
        }
    }
    else {
        taskId = "" + taskID;
    }

    id = taskId;
}
 
源代码16 项目: elasticsearch-hadoop   文件: EsOutputFormat.java
private int detectCurrentInstance(Configuration conf) {
    TaskID taskID = HadoopCfgUtils.getTaskID(conf);

    if (taskID == null) {
        log.warn(String.format("Cannot determine task id - redirecting writes in a random fashion"));
        return NO_TASK_ID;
    }

    return taskID.getId();
}
 
源代码17 项目: tez   文件: IDConverter.java
public static TezTaskID
    fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
  return TezTaskID.getInstance(
      TezVertexID.getInstance(fromMRJobId(taskid.getJobID()),
              (taskid.getTaskType() == TaskType.MAP ? 0 : 1)
          ),
      taskid.getId());
}
 
源代码18 项目: hadoop   文件: TaskReport.java
public TaskReport() {
  taskid = new TaskID();
}
 
源代码19 项目: hadoop   文件: TaskReport.java
/** The ID of the task. */
public TaskID getTaskID() {
  return taskid;
}
 
源代码20 项目: big-c   文件: TaskReport.java
public TaskReport() {
  taskid = new TaskID();
}
 
源代码21 项目: big-c   文件: TaskReport.java
/** The ID of the task. */
public TaskID getTaskID() {
  return taskid;
}
 
源代码22 项目: ignite   文件: HadoopV2TaskContext.java
/**
 * Creates Hadoop attempt ID.
 *
 * @return Attempt ID.
 */
public TaskAttemptID attemptId() {
    TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());

    return new TaskAttemptID(tid, taskInfo().attempt());
}
 
源代码23 项目: incubator-tez   文件: IDConverter.java
public static TaskID toMRTaskId(TezTaskID taskid) {
  return new TaskID(
      toMRJobId(taskid.getVertexID().getDAGId()),
      taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
      taskid.getId());
}
 
源代码24 项目: incubator-tez   文件: IDConverter.java
public static TaskID toMRTaskIdForOutput(TezTaskID taskid) {
  return org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
      .createMockTaskAttemptIDFromTezTaskId(taskid, (taskid.getVertexID().getId() == 0));
}
 
源代码25 项目: incubator-tez   文件: MRTask.java
public void initialize(TezProcessorContext context) throws IOException,
InterruptedException {

  DeprecatedKeys.init();

  processorContext = context;
  counters = context.getCounters();
  this.taskAttemptId = new TaskAttemptID(
      new TaskID(
          Long.toString(context.getApplicationId().getClusterTimestamp()),
          context.getApplicationId().getId(),
          (isMap ? TaskType.MAP : TaskType.REDUCE),
          context.getTaskIndex()),
        context.getTaskAttemptNumber());

  byte[] userPayload = context.getUserPayload();
  Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
  if (conf instanceof JobConf) {
    this.jobConf = (JobConf)conf;
  } else {
    this.jobConf = new JobConf(conf);
  }
  jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
      taskAttemptId.toString());
  jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
    taskAttemptId.toString());
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
      context.getDAGAttemptNumber());

  LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());

  // TODO Post MRR
  // A single file per vertex will likely be a better solution. Does not
  // require translation - client can take care of this. Will work independent
  // of whether the configuration is for intermediate tasks or not. Has the
  // overhead of localizing multiple files per job - i.e. the client would
  // need to write these files to hdfs, add them as local resources per
  // vertex. A solution like this may be more practical once it's possible to
  // submit configuration parameters to the AM and effectively tasks via RPC.

  jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());

  if (LOG.isDebugEnabled() && userPayload != null) {
    Iterator<Entry<String, String>> iter = jobConf.iterator();
    String taskIdStr = taskAttemptId.getTaskID().toString();
    while (iter.hasNext()) {
      Entry<String, String> confEntry = iter.next();
      LOG.debug("TaskConf Entry"
          + ", taskId=" + taskIdStr
          + ", key=" + confEntry.getKey()
          + ", value=" + confEntry.getValue());
    }
  }

  configureMRTask();
}
 
源代码26 项目: tez   文件: IDConverter.java
public static TaskID toMRTaskId(TezTaskID taskid) {
  return new TaskID(
      toMRJobId(taskid.getVertexID().getDAGId()),
      taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
      taskid.getId());
}
 
源代码27 项目: tez   文件: IDConverter.java
public static TaskID toMRTaskIdForOutput(TezTaskID taskid) {
  return org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
      .createMockTaskAttemptIDFromTezTaskId(taskid, (taskid.getVertexID().getId() == 0));
}
 
源代码28 项目: tez   文件: MRTask.java
@Override
public void initialize() throws IOException,
InterruptedException {

  DeprecatedKeys.init();

  processorContext = getContext();
  counters = processorContext.getCounters();
  this.taskAttemptId = new TaskAttemptID(
      new TaskID(
          Long.toString(processorContext.getApplicationId().getClusterTimestamp()),
          processorContext.getApplicationId().getId(),
          (isMap ? TaskType.MAP : TaskType.REDUCE),
          processorContext.getTaskIndex()),
      processorContext.getTaskAttemptNumber());

  UserPayload userPayload = processorContext.getUserPayload();
  Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
  if (conf instanceof JobConf) {
    this.jobConf = (JobConf)conf;
  } else {
    this.jobConf = new JobConf(conf);
  }
  jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
      taskAttemptId.toString());
  jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
    taskAttemptId.toString());
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
      processorContext.getDAGAttemptNumber());

  LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());

  // TODO Post MRR
  // A single file per vertex will likely be a better solution. Does not
  // require translation - client can take care of this. Will work independent
  // of whether the configuration is for intermediate tasks or not. Has the
  // overhead of localizing multiple files per job - i.e. the client would
  // need to write these files to hdfs, add them as local resources per
  // vertex. A solution like this may be more practical once it's possible to
  // submit configuration parameters to the AM and effectively tasks via RPC.

  jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());

  if (LOG.isDebugEnabled() && userPayload != null) {
    Iterator<Entry<String, String>> iter = jobConf.iterator();
    String taskIdStr = taskAttemptId.getTaskID().toString();
    while (iter.hasNext()) {
      Entry<String, String> confEntry = iter.next();
      LOG.debug("TaskConf Entry"
          + ", taskId=" + taskIdStr
          + ", key=" + confEntry.getKey()
          + ", value=" + confEntry.getValue());
    }
  }

  configureMRTask();
}
 
源代码29 项目: tez   文件: MRInputBase.java
public List<Event> initialize() throws IOException {
  getContext().requestInitialMemory(0l, null); // mandatory call
  MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
      MRInputHelpers.parseMRInputPayload(getContext().getUserPayload());
  boolean isGrouped = mrUserPayload.getGroupingEnabled();
  Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
      "Split information not expected in " + this.getClass().getName());

  Configuration conf = new JobConf(getContext().getContainerConfiguration());
  TezUtils.addToConfFromByteString(conf, mrUserPayload.getConfigurationBytes());
  this.jobConf = new JobConf(conf);
  useNewApi = this.jobConf.getUseNewMapper();
  if (isGrouped) {
    if (useNewApi) {
      jobConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
          org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.class.getName());
    } else {
      jobConf.set("mapred.input.format.class",
          org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.class.getName());
    }
  }


  // Add tokens to the jobConf - in case they are accessed within the RR / IF
  jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());

  TaskAttemptID taskAttemptId = new TaskAttemptID(
      new TaskID(
          Long.toString(getContext().getApplicationId().getClusterTimestamp()),
          getContext().getApplicationId().getId(), TaskType.MAP,
          getContext().getTaskIndex()),
      getContext().getTaskAttemptNumber());

  jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
      taskAttemptId.toString());
  jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
      getContext().getDAGAttemptNumber());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_INDEX, getContext().getDagIdentifier());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_VERTEX_INDEX, getContext().getTaskVertexIndex());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_INDEX, getContext().getTaskIndex());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX, getContext().getTaskAttemptNumber());
  jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_NAME, getContext().getDAGName());
  jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_NAME, getContext().getTaskVertexName());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_INPUT_INDEX, getContext().getInputIndex());
  jobConf.set(MRInput.TEZ_MAPREDUCE_INPUT_NAME, getContext().getSourceVertexName());
  jobConf.set(MRInput.TEZ_MAPREDUCE_APPLICATION_ID, getContext().getApplicationId().toString());
  jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
  jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());

  TezDAGID tezDAGID = TezDAGID.getInstance(getContext().getApplicationId(), getContext().getDagIdentifier());
  TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, getContext().getTaskVertexIndex());
  TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, getContext().getTaskIndex());
  TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, getContext().getTaskAttemptNumber());
  jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_ID, tezDAGID.toString());
  jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_ID, tezVertexID.toString());
  jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ID, tezTaskID.toString());
  jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID, tezTaskAttemptID.toString());

  this.inputRecordCounter = getContext().getCounters().findCounter(
      TaskCounter.INPUT_RECORDS_PROCESSED);


  return null;
}
 
 类所在包
 类方法
 同包方法