下面列出了怎么用org.apache.hadoop.mapreduce.v2.app.job.Task的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Child checking whether it can commit.
*
* <br>
* Commit is a two-phased protocol. First the attempt informs the
* ApplicationMaster that it is
* {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
* the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
* a legacy from the centralized commit protocol handling by the JobTracker.
*/
@Override
public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Commit go/no-go request from " + taskAttemptID.toString());
// An attempt is asking if it can commit its output. This can be decided
// only by the task which is managing the multiple attempts. So redirect the
// request there.
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.progressing(attemptID);
// tell task to retry later if AM has not heard from RM within the commit
// window to help avoid double-committing in a split-brain situation
long now = context.getClock().getTime();
if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
return false;
}
Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId());
return task.canCommit(attemptID);
}
@Test
//First attempt is failed and second attempt is passed
//The job succeeds.
public void testFailTask() throws Exception {
MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
Configuration conf = new Configuration();
// this test requires two task attempts, but uberization overrides max to 1
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
task.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
//one attempt must be failed
//and another must have succeeded
Iterator<TaskAttempt> it = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
it.next().getReport().getTaskAttemptState());
Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
it.next().getReport().getTaskAttemptState());
}
private long storedPerAttemptValue
(Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
if (task == null) {
return -1L;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return -1L;
}
AtomicLong estimate = data.get(taskAttempt);
return estimate == null ? -1L : estimate.get();
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskCounterInfo getSingleTaskCounters(
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
TaskId taskID = MRApps.toTaskID(tid);
if (taskID == null) {
throw new NotFoundException("taskid " + tid + " not found or invalid");
}
Task task = job.getTask(taskID);
if (task == null) {
throw new NotFoundException("task not found with id " + tid);
}
return new JobTaskCounterInfo(task);
}
@Test public void testSingleTaskCounterView() {
AppContext appContext = new MockAppContext(0, 1, 1, 2);
Map<String, String> params = getTaskParams(appContext);
params.put(AMParams.COUNTER_GROUP,
"org.apache.hadoop.mapreduce.FileSystemCounter");
params.put(AMParams.COUNTER_NAME, "HDFS_WRITE_OPS");
// remove counters from one task attempt
// to test handling of missing counters
TaskId taskID = MRApps.toTaskID(params.get(AMParams.TASK_ID));
Job job = appContext.getJob(taskID.getJobId());
Task task = job.getTask(taskID);
TaskAttempt attempt = task.getAttempts().values().iterator().next();
attempt.getReport().setCounters(null);
WebAppTests.testPage(SingleCounterPage.class, AppContext.class,
appContext, params);
}
public void verifyTaskGeneric(Task task, String id, String state,
String type, String successfulAttempt, long startTime, long finishTime,
long elapsedTime, float progress) {
TaskId taskid = task.getID();
String tid = MRApps.toString(taskid);
TaskReport report = task.getReport();
WebServicesTestUtils.checkStringMatch("id", tid, id);
WebServicesTestUtils.checkStringMatch("type", task.getType().toString(),
type);
WebServicesTestUtils.checkStringMatch("state", report.getTaskState()
.toString(), state);
// not easily checked without duplicating logic, just make sure its here
assertNotNull("successfulAttempt null", successfulAttempt);
assertEquals("startTime wrong", report.getStartTime(), startTime);
assertEquals("finishTime wrong", report.getFinishTime(), finishTime);
assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime);
assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f);
}
@Test
public void testTaskAttempts() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("tasks").path(tid)
.path("attempts").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
verifyHsTaskAttempts(json, task);
}
}
}
@Test
public void testJobTaskCountersXML() throws Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("mapreduce")
.path("jobs").path(jobId).path("tasks").path(tid).path("counters")
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList info = dom.getElementsByTagName("jobTaskCounters");
verifyAMTaskCountersXML(info, task);
}
}
}
@Test
public void testTaskIdCountersSlash() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("tasks").path(tid)
.path("counters/").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobTaskCounters");
verifyHsJobTaskCounters(info, task);
}
}
}
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(job, task);
} else if (taskEvent.getState() == TaskState.FAILED) {
taskFailed(job, task);
} else if (taskEvent.getState() == TaskState.KILLED) {
taskKilled(job, task);
}
return checkJobAfterTaskCompletion(job);
}
@Before
public void setup() throws IOException {
this.conf = new JobConf();
this.conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
NullGroupsProvider.class.getName());
this.conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
Groups.getUserToGroupsMappingService(conf);
this.ctx = buildHistoryContext(this.conf);
WebApp webApp = mock(HsWebApp.class);
when(webApp.name()).thenReturn("hsmockwebapp");
this.hsWebServices= new HsWebServices(ctx, conf, webApp);
this.hsWebServices.setResponse(mock(HttpServletResponse.class));
Job job = ctx.getAllJobs().values().iterator().next();
this.jobIdStr = job.getID().toString();
Task task = job.getTasks().values().iterator().next();
this.taskIdStr = task.getID().toString();
this.taskAttemptIdStr =
task.getAttempts().keySet().iterator().next().toString();
}
protected DataStatistics dataStatisticsForTask(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return null;
}
Task task = job.getTask(taskID);
if (task == null) {
return null;
}
return task.getType() == TaskType.MAP
? mapperStatistics.get(job)
: task.getType() == TaskType.REDUCE
? reducerStatistics.get(job)
: null;
}
/**
* Child checking whether it can commit.
*
* <br>
* Commit is a two-phased protocol. First the attempt informs the
* ApplicationMaster that it is
* {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
* the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
* a legacy from the centralized commit protocol handling by the JobTracker.
*/
@Override
public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Commit go/no-go request from " + taskAttemptID.toString());
// An attempt is asking if it can commit its output. This can be decided
// only by the task which is managing the multiple attempts. So redirect the
// request there.
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.progressing(attemptID);
// tell task to retry later if AM has not heard from RM within the commit
// window to help avoid double-committing in a split-brain situation
long now = context.getClock().getTime();
if (now - rmHeartbeatHandler.getLastHeartbeatTime() > commitWindowMs) {
return false;
}
Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId());
return task.canCommit(attemptID);
}
public void verifyAMTaskXML(NodeList nodes, Job job) {
assertEquals("incorrect number of elements", 2, nodes.getLength());
for (Task task : job.getTasks().values()) {
TaskId id = task.getID();
String tid = MRApps.toString(id);
Boolean found = false;
for (int i = 0; i < nodes.getLength(); i++) {
Element element = (Element) nodes.item(i);
if (tid.matches(WebServicesTestUtils.getXmlString(element, "id"))) {
found = true;
verifyAMSingleTaskXML(element, task);
}
}
assertTrue("task with id: " + tid + " not in web service output", found);
}
}
@GET
@Path("/jobs/{jobid}/tasks/{taskid}/attempts")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
Job job = getJobFromJobIdString(jid, appCtx);
checkAccess(job, hsr);
Task task = getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
if (task.getType() == TaskType.REDUCE) {
attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
} else {
attempts.add(new TaskAttemptInfo(ta, task.getType(), true));
}
}
}
return attempts;
}
@Test
public void testTaskIdCounters() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("tasks").path(tid)
.path("counters").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobTaskCounters");
verifyHsJobTaskCounters(info, task);
}
}
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
if (task.getType() == TaskType.REDUCE) {
attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
} else {
attempts.add(new TaskAttemptInfo(ta, task.getType(), false));
}
}
}
return attempts;
}
private float getReduceProgress() {
Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId());
float runtime = getCodeRuntime();
Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
int numberMaps = allMapTasks.size();
int numberDoneMaps = 0;
for (Task mapTask : allMapTasks) {
if (mapTask.isFinished()) {
++numberDoneMaps;
}
}
if (numberMaps == numberDoneMaps) {
shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
return Math.min
((float) (clock.getTime() - shuffleCompletedTime)
/ (runtime * 2000.0F) + 0.5F,
1.0F);
} else {
return ((float) numberDoneMaps) / numberMaps * 0.5F;
}
}
@Override
public GetTaskReportsResponse getTaskReports(
GetTaskReportsRequest request) throws IOException {
JobId jobId = request.getJobId();
TaskType taskType = request.getTaskType();
GetTaskReportsResponse response =
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report for " + taskType + " " + jobId
+ ". Report-size will be " + tasks.size());
// Take lock to allow only one call, otherwise heap will blow up because
// of counters in the report when there are multiple callers.
synchronized (getTaskReportsLock) {
for (Task task : tasks) {
response.addTaskReport(task.getReport());
}
}
return response;
}
@Test
public void testTaskId() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("mapreduce")
.path("jobs").path(jobId).path("tasks").path(tid)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("task");
verifyAMSingleTask(info, task);
}
}
}
@Test
public void testTaskIdSlash() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("mapreduce")
.path("jobs").path(jobId).path("tasks").path(tid + "/")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("task");
verifyAMSingleTask(info, task);
}
}
}
@Test
public void testTaskAttempts() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("mapreduce")
.path("jobs").path(jobId).path("tasks").path(tid).path("attempts")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
verifyAMTaskAttempts(json, task);
}
}
}
@Test
public void testTaskIdCounters() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("tasks").path(tid)
.path("counters").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobTaskCounters");
verifyHsJobTaskCounters(info, task);
}
}
}
@Test
public void testTaskId() throws JSONException, Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("tasks").path(tid)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("task");
verifyHsSingleTask(info, task);
}
}
}
@Test
public void testJobTaskCountersXML() throws Exception {
WebResource r = resource();
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
for (Task task : jobsMap.get(id).getTasks().values()) {
String tid = MRApps.toString(task.getID());
ClientResponse response = r.path("ws").path("v1").path("mapreduce")
.path("jobs").path(jobId).path("tasks").path(tid).path("counters")
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList info = dom.getElementsByTagName("jobTaskCounters");
verifyAMTaskCountersXML(info, task);
}
}
}
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws IOException {
JobId jobId = request.getJobId();
TaskType taskType = request.getTaskType();
GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, true);
Collection<Task> tasks = job.getTasks(taskType).values();
for (Task task : tasks) {
response.addTaskReport(task.getReport());
}
return response;
}
public void verifyHsSingleTaskXML(Element element, Task task) {
verifyTaskGeneric(task, WebServicesTestUtils.getXmlString(element, "id"),
WebServicesTestUtils.getXmlString(element, "state"),
WebServicesTestUtils.getXmlString(element, "type"),
WebServicesTestUtils.getXmlString(element, "successfulAttempt"),
WebServicesTestUtils.getXmlLong(element, "startTime"),
WebServicesTestUtils.getXmlLong(element, "finishTime"),
WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
WebServicesTestUtils.getXmlFloat(element, "progress"));
}
static TaskCount getTaskCount(Collection<Task> tasks) {
TaskCount tc = new TaskCount();
for (Task task : tasks) {
tc.incr(task);
}
return tc;
}
public void launchedTask(Task task) {
switch (task.getType()) {
case MAP:
mapsLaunched.incr();
break;
case REDUCE:
reducesLaunched.incr();
break;
}
endWaitingTask(task);
}
public JobTaskCounterInfo(Task task) {
total = task.getCounters();
this.id = MRApps.toString(task.getID());
taskCounterGroup = new ArrayList<TaskCounterGroupInfo>();
if (total != null) {
for (CounterGroup g : total) {
if (g != null) {
TaskCounterGroupInfo cginfo = new TaskCounterGroupInfo(g.getName(), g);
taskCounterGroup.add(cginfo);
}
}
}
}