下面列出了怎么用org.apache.hadoop.mapred.TaskID的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
public static TaskID getTaskID(Configuration cfg) {
// first try with the attempt since some Hadoop versions mix the two
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
if (StringUtils.hasText(taskAttemptId)) {
try {
return TaskAttemptID.forName(taskAttemptId).getTaskID();
} catch (IllegalArgumentException ex) {
// the task attempt is invalid (Tez in particular uses the wrong string - see #346)
// try to fallback to task id
return parseTaskIdFromTaskAttemptId(taskAttemptId);
}
}
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
// double-check task id bug in Hadoop 2.5.x
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
return TaskID.forName(taskIdProp);
}
return null;
}
private static TaskID parseTaskIdFromTaskAttemptId(String taskAttemptId) {
// Tez in particular uses an incorrect String task1244XXX instead of task_1244 which makes the parsing fail
// this method try to cope with such issues and look at the numbers if possible
if (taskAttemptId.startsWith("task")) {
taskAttemptId = taskAttemptId.substring(4);
}
if (taskAttemptId.startsWith("_")) {
taskAttemptId = taskAttemptId.substring(1);
}
List<String> tokenize = StringUtils.tokenize(taskAttemptId, "_");
// need at least 4 entries from 123123123123_0001_r_0000_4
if (tokenize.size() < 4) {
LogFactory.getLog(HadoopCfgUtils.class).warn("Cannot parse task attempt (too little arguments) " + taskAttemptId);
return null;
}
// we parse straight away - in case of an exception we can catch the new format
try {
return new TaskID(tokenize.get(0), Integer.parseInt(tokenize.get(1)), tokenize.get(2).startsWith("m"), Integer.parseInt(tokenize.get(3)));
} catch (Exception ex) {
LogFactory.getLog(HadoopCfgUtils.class).warn("Cannot parse task attempt " + taskAttemptId);
return null;
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
/**
* Creates a new TaskReport object
* @param taskid
* @param progress
* @param state
* @param diagnostics
* @param currentStatus
* @param startTime
* @param finishTime
* @param counters
*/
public TaskReport(TaskID taskid, float progress, String state,
String[] diagnostics, TIPStatus currentStatus,
long startTime, long finishTime,
Counters counters) {
this.taskid = taskid;
this.progress = progress;
this.state = state;
this.diagnostics = diagnostics;
this.currentStatus = currentStatus;
this.startTime = startTime;
this.finishTime = finishTime;
this.counters = counters;
}
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
/**
* Creates a new TaskReport object
* @param taskid
* @param progress
* @param state
* @param diagnostics
* @param currentStatus
* @param startTime
* @param finishTime
* @param counters
*/
public TaskReport(TaskID taskid, float progress, String state,
String[] diagnostics, TIPStatus currentStatus,
long startTime, long finishTime,
Counters counters) {
this.taskid = taskid;
this.progress = progress;
this.state = state;
this.diagnostics = diagnostics;
this.currentStatus = currentStatus;
this.startTime = startTime;
this.finishTime = finishTime;
this.counters = counters;
}
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
@Nonnull
public static String getJobIdFromTaskId(@Nonnull String taskidStr) {
if (!taskidStr.startsWith("task_")) {// workaround for Tez
taskidStr = taskidStr.replace("task", "task_");
taskidStr = taskidStr.substring(0, taskidStr.lastIndexOf('_'));
}
TaskID taskId = TaskID.forName(taskidStr);
JobID jobId = taskId.getJobID();
return jobId.toString();
}
@Test
public void getTaskAttemptId() {
final TaskAttemptID id = new TaskAttemptID( new TaskID(), 0 );
org.apache.hadoop.mapred.TaskCompletionEvent delegate = new org.apache.hadoop.mapred.TaskCompletionEvent() {
public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptId() {
return id;
}
};
TaskCompletionEventProxy proxy = new TaskCompletionEventProxy( delegate );
assertEquals( id, proxy.getTaskAttemptId() );
}
public static TezTaskID
fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
return TezTaskID.getInstance(
TezVertexID.getInstance(fromMRJobId(taskid.getJobID()),
(taskid.getTaskType() == TaskType.MAP ? 0 : 1)
),
taskid.getId());
}
public List<Event> initialize() throws IOException {
getContext().requestInitialMemory(0l, null); // mandatory call
MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(getContext().getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"Split information not expected in " + this.getClass().getName());
Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
// Add tokens to the jobConf - in case they are accessed within the RR / IF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
TaskAttemptID taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(getContext().getApplicationId().getClusterTimestamp()),
getContext().getApplicationId().getId(), TaskType.MAP,
getContext().getTaskIndex()),
getContext().getTaskAttemptNumber());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
useNewApi = this.jobConf.getUseNewMapper();
return null;
}
HeartBeat(final Progressable progressable, Configuration cfg, TimeValue lead, final Log log) {
Assert.notNull(progressable, "a valid progressable is required to report status to Hadoop");
TimeValue tv = HadoopCfgUtils.getTaskTimeout(cfg);
Assert.isTrue(tv.getSeconds() <= 0 || tv.getSeconds() > lead.getSeconds(), "Hadoop timeout is shorter than the heartbeat");
this.progressable = progressable;
long cfgMillis = (tv.getMillis() > 0 ? tv.getMillis() : 0);
// the task is simple hence the delay = timeout - lead, that is when to start the notification right before the timeout
this.delay = new TimeValue(Math.abs(cfgMillis - lead.getMillis()), TimeUnit.MILLISECONDS);
this.log = log;
String taskId;
TaskID taskID = HadoopCfgUtils.getTaskID(cfg);
if (taskID == null) {
log.warn("Cannot determine task id...");
taskId = "<unknown>";
if (log.isTraceEnabled()) {
log.trace("Current configuration is " + HadoopCfgUtils.asProperties(cfg));
}
}
else {
taskId = "" + taskID;
}
id = taskId;
}
private int detectCurrentInstance(Configuration conf) {
TaskID taskID = HadoopCfgUtils.getTaskID(conf);
if (taskID == null) {
log.warn(String.format("Cannot determine task id - redirecting writes in a random fashion"));
return NO_TASK_ID;
}
return taskID.getId();
}
public static TezTaskID
fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
return TezTaskID.getInstance(
TezVertexID.getInstance(fromMRJobId(taskid.getJobID()),
(taskid.getTaskType() == TaskType.MAP ? 0 : 1)
),
taskid.getId());
}
public TaskReport() {
taskid = new TaskID();
}
/** The ID of the task. */
public TaskID getTaskID() {
return taskid;
}
public TaskReport() {
taskid = new TaskID();
}
/** The ID of the task. */
public TaskID getTaskID() {
return taskid;
}
/**
* Creates Hadoop attempt ID.
*
* @return Attempt ID.
*/
public TaskAttemptID attemptId() {
TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
return new TaskAttemptID(tid, taskInfo().attempt());
}
public static TaskID toMRTaskId(TezTaskID taskid) {
return new TaskID(
toMRJobId(taskid.getVertexID().getDAGId()),
taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
taskid.getId());
}
public static TaskID toMRTaskIdForOutput(TezTaskID taskid) {
return org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptIDFromTezTaskId(taskid, (taskid.getVertexID().getId() == 0));
}
public void initialize(TezProcessorContext context) throws IOException,
InterruptedException {
DeprecatedKeys.init();
processorContext = context;
counters = context.getCounters();
this.taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
(isMap ? TaskType.MAP : TaskType.REDUCE),
context.getTaskIndex()),
context.getTaskAttemptNumber());
byte[] userPayload = context.getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
if (conf instanceof JobConf) {
this.jobConf = (JobConf)conf;
} else {
this.jobConf = new JobConf(conf);
}
jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());
LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
// TODO Post MRR
// A single file per vertex will likely be a better solution. Does not
// require translation - client can take care of this. Will work independent
// of whether the configuration is for intermediate tasks or not. Has the
// overhead of localizing multiple files per job - i.e. the client would
// need to write these files to hdfs, add them as local resources per
// vertex. A solution like this may be more practical once it's possible to
// submit configuration parameters to the AM and effectively tasks via RPC.
jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
if (LOG.isDebugEnabled() && userPayload != null) {
Iterator<Entry<String, String>> iter = jobConf.iterator();
String taskIdStr = taskAttemptId.getTaskID().toString();
while (iter.hasNext()) {
Entry<String, String> confEntry = iter.next();
LOG.debug("TaskConf Entry"
+ ", taskId=" + taskIdStr
+ ", key=" + confEntry.getKey()
+ ", value=" + confEntry.getValue());
}
}
configureMRTask();
}
public static TaskID toMRTaskId(TezTaskID taskid) {
return new TaskID(
toMRJobId(taskid.getVertexID().getDAGId()),
taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
taskid.getId());
}
public static TaskID toMRTaskIdForOutput(TezTaskID taskid) {
return org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptIDFromTezTaskId(taskid, (taskid.getVertexID().getId() == 0));
}
@Override
public void initialize() throws IOException,
InterruptedException {
DeprecatedKeys.init();
processorContext = getContext();
counters = processorContext.getCounters();
this.taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(processorContext.getApplicationId().getClusterTimestamp()),
processorContext.getApplicationId().getId(),
(isMap ? TaskType.MAP : TaskType.REDUCE),
processorContext.getTaskIndex()),
processorContext.getTaskAttemptNumber());
UserPayload userPayload = processorContext.getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
if (conf instanceof JobConf) {
this.jobConf = (JobConf)conf;
} else {
this.jobConf = new JobConf(conf);
}
jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
processorContext.getDAGAttemptNumber());
LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
// TODO Post MRR
// A single file per vertex will likely be a better solution. Does not
// require translation - client can take care of this. Will work independent
// of whether the configuration is for intermediate tasks or not. Has the
// overhead of localizing multiple files per job - i.e. the client would
// need to write these files to hdfs, add them as local resources per
// vertex. A solution like this may be more practical once it's possible to
// submit configuration parameters to the AM and effectively tasks via RPC.
jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
if (LOG.isDebugEnabled() && userPayload != null) {
Iterator<Entry<String, String>> iter = jobConf.iterator();
String taskIdStr = taskAttemptId.getTaskID().toString();
while (iter.hasNext()) {
Entry<String, String> confEntry = iter.next();
LOG.debug("TaskConf Entry"
+ ", taskId=" + taskIdStr
+ ", key=" + confEntry.getKey()
+ ", value=" + confEntry.getValue());
}
}
configureMRTask();
}
public List<Event> initialize() throws IOException {
getContext().requestInitialMemory(0l, null); // mandatory call
MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
MRInputHelpers.parseMRInputPayload(getContext().getUserPayload());
boolean isGrouped = mrUserPayload.getGroupingEnabled();
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"Split information not expected in " + this.getClass().getName());
Configuration conf = new JobConf(getContext().getContainerConfiguration());
TezUtils.addToConfFromByteString(conf, mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
useNewApi = this.jobConf.getUseNewMapper();
if (isGrouped) {
if (useNewApi) {
jobConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat.class.getName());
} else {
jobConf.set("mapred.input.format.class",
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.class.getName());
}
}
// Add tokens to the jobConf - in case they are accessed within the RR / IF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
TaskAttemptID taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(getContext().getApplicationId().getClusterTimestamp()),
getContext().getApplicationId().getId(), TaskType.MAP,
getContext().getTaskIndex()),
getContext().getTaskAttemptNumber());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_INDEX, getContext().getDagIdentifier());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_VERTEX_INDEX, getContext().getTaskVertexIndex());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_INDEX, getContext().getTaskIndex());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_INDEX, getContext().getTaskAttemptNumber());
jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_NAME, getContext().getDAGName());
jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_NAME, getContext().getTaskVertexName());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_INPUT_INDEX, getContext().getInputIndex());
jobConf.set(MRInput.TEZ_MAPREDUCE_INPUT_NAME, getContext().getSourceVertexName());
jobConf.set(MRInput.TEZ_MAPREDUCE_APPLICATION_ID, getContext().getApplicationId().toString());
jobConf.set(MRInput.TEZ_MAPREDUCE_UNIQUE_IDENTIFIER, getContext().getUniqueIdentifier());
jobConf.setInt(MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER, getContext().getDAGAttemptNumber());
TezDAGID tezDAGID = TezDAGID.getInstance(getContext().getApplicationId(), getContext().getDagIdentifier());
TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, getContext().getTaskVertexIndex());
TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, getContext().getTaskIndex());
TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, getContext().getTaskAttemptNumber());
jobConf.set(MRInput.TEZ_MAPREDUCE_DAG_ID, tezDAGID.toString());
jobConf.set(MRInput.TEZ_MAPREDUCE_VERTEX_ID, tezVertexID.toString());
jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ID, tezTaskID.toString());
jobConf.set(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID, tezTaskAttemptID.toString());
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
return null;
}