下面列出了org.apache.hadoop.mapred.TaskAttemptContextImpl#org.apache.hadoop.mapred.JobID 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("deprecation")
@Test(timeout = 30000)
public void testJobState() throws Exception {
Job job_1 = getCopyJob();
JobControl jc = new JobControl("Test");
jc.addJob(job_1);
Assert.assertEquals(Job.WAITING, job_1.getState());
job_1.setState(Job.SUCCESS);
Assert.assertEquals(Job.WAITING, job_1.getState());
org.apache.hadoop.mapreduce.Job mockjob =
mock(org.apache.hadoop.mapreduce.Job.class);
org.apache.hadoop.mapreduce.JobID jid =
new org.apache.hadoop.mapreduce.JobID("test", 0);
when(mockjob.getJobID()).thenReturn(jid);
job_1.setJob(mockjob);
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
job_1.setMapredJobID("job_test_0001");
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
jc.stop();
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
@SuppressWarnings("deprecation")
@Test(timeout = 30000)
public void testJobState() throws Exception {
Job job_1 = getCopyJob();
JobControl jc = new JobControl("Test");
jc.addJob(job_1);
Assert.assertEquals(Job.WAITING, job_1.getState());
job_1.setState(Job.SUCCESS);
Assert.assertEquals(Job.WAITING, job_1.getState());
org.apache.hadoop.mapreduce.Job mockjob =
mock(org.apache.hadoop.mapreduce.Job.class);
org.apache.hadoop.mapreduce.JobID jid =
new org.apache.hadoop.mapreduce.JobID("test", 0);
when(mockjob.getJobID()).thenReturn(jid);
job_1.setJob(mockjob);
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
job_1.setMapredJobID("job_test_0001");
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
jc.stop();
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
@Override
public void abortJob(final JobContext context, final JobStatus.State runState) throws java.io.IOException {
super.abortJob(context, runState);
final JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
final RunningJob job = jobClient.getJob((org.apache.hadoop.mapred.JobID) JobID.forName(context.getConfiguration().get("mapred.job.id")));
String diag = "";
for (final TaskCompletionEvent event : job.getTaskCompletionEvents(0))
switch (event.getTaskStatus()) {
case SUCCEEDED:
break;
default:
diag += "Diagnostics for: " + event.getTaskTrackerHttp() + "\n";
for (final String s : job.getTaskDiagnostics(event.getTaskAttemptId()))
diag += s + "\n";
diag += "\n";
break;
}
updateStatus(diag, context.getConfiguration().getInt("boa.hadoop.jobid", 0));
}
@Override
public void killJob(String jobID, Configuration conf) throws BackendException {
try {
if (conf != null) {
JobConf jobConf = new JobConf(conf);
JobClient jc = new JobClient(jobConf);
JobID id = JobID.forName(jobID);
RunningJob job = jc.getJob(id);
if (job == null)
System.out.println("Job with id " + jobID + " is not active");
else
{
job.killJob();
log.info("Kill " + id + " submitted.");
}
}
} catch (IOException e) {
throw new BackendException(e);
}
}
@Test
public void testMedianMapReduceTime() throws Exception {
JobClient jobClient = Mockito.mock(JobClient.class);
// mock methods to return the predefined map and reduce task reports
Mockito.when(jobClient.getMapTaskReports(jobID)).thenReturn(mapTaskReports);
Mockito.when(jobClient.getReduceTaskReports(jobID)).thenReturn(reduceTaskReports);
PigStats.JobGraph jobGraph = new PigStats.JobGraph();
MRJobStats jobStats = createJobStats("JobStatsTest", jobGraph);
getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
jobStats.setSuccessful(true);
getJobStatsMethod("addMapReduceStatistics", Iterator.class, Iterator.class)
.invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), Arrays.asList(reduceTaskReports).iterator());
String msg = (String)getJobStatsMethod("getDisplayString")
.invoke(jobStats);
System.out.println(JobStats.SUCCESS_HEADER);
System.out.println(msg);
assertTrue(msg.startsWith(ASSERT_STRING));
}
private Job submitAndValidateJob(JobConf conf, int numMaps, int numReds)
throws IOException, InterruptedException, ClassNotFoundException {
conf.setJobSetupCleanupNeeded(false);
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir,
numMaps, numReds);
job.setOutputFormatClass(MyOutputFormat.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
JobID jobid = (org.apache.hadoop.mapred.JobID)job.getID();
JobClient jc = new JobClient(conf);
assertTrue(jc.getSetupTaskReports(jobid).length == 0);
assertTrue(jc.getCleanupTaskReports(jobid).length == 0);
assertTrue(jc.getMapTaskReports(jobid).length == numMaps);
assertTrue(jc.getReduceTaskReports(jobid).length == numReds);
FileSystem fs = FileSystem.get(conf);
assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
int numPartFiles = numReds == 0 ? numMaps : numReds;
assertTrue("Number of part-files is " + list.length + " and not "
+ numPartFiles, list.length == numPartFiles);
return job;
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
try {
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
try {
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void commitJob(String location) throws IOException {
jobConf.set(OUTDIR, location);
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
outputCommitter.commitJob(jobContext);
}
public static Optional<String> findHistoryFilePath(
Iterator<LocatedFileStatus> listing, ApplicationId applicationId) {
JobID jobId = new JobID(
String.valueOf(applicationId.getClusterTimestamp()),
applicationId.getId());
List<LocatedFileStatus> jhistFiles = new ArrayList<>();
// maybe this could work more nicely with some recursive glob and a filter
try {
jhistFiles = StreamSupport
.stream(Spliterators.spliteratorUnknownSize(listing, Spliterator.NONNULL), false)
.filter(fstatus -> fstatus.getPath().toString()
.matches(".*" + jobId.toString() + ".*.jhist"))
.collect(Collectors.toList());
} catch (RemoteIteratorAdaptor.WrappedRemoteIteratorException wrie) {
// We can't really do overly much at this point, as this is an error from the
// underlying hadoop filesystem implementation. But we want to at least log this
// separately from other conditions.
logger.error("Retrieving remote listing failed", wrie);
}
if (jhistFiles.size() < 1) {
logger.error("Could not locate a history file for parameters");
return Optional.empty();
} else if (jhistFiles.size() > 1) {
logger.error("Found two or more matching files, will dump first");
}
return jhistFiles.stream()
.findFirst()
.map(x -> x.getPath().toString());
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@Test (timeout = 30000)
public void testGetAssignedJobId() throws Exception {
JobConf jc = new JobConf();
Job j = new Job(jc);
//Just make sure no exception is thrown
assertNull(j.getAssignedJobID());
org.apache.hadoop.mapreduce.Job mockjob = mock(org.apache.hadoop.mapreduce.Job.class);
org.apache.hadoop.mapreduce.JobID jid = new org.apache.hadoop.mapreduce.JobID("test",0);
when(mockjob.getJobID()).thenReturn(jid);
j.setJob(mockjob);
JobID expected = new JobID("test",0);
assertEquals(expected, j.getAssignedJobID());
verify(mockjob).getJobID();
}
/**
* @return the mapred ID of this job as assigned by the mapred framework.
*/
public JobID getAssignedJobID() {
org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobId();
if (temp == null) {
return null;
}
return JobID.downgrade(temp);
}
@Test (timeout = 30000)
public void testGetAssignedJobId() throws Exception {
JobConf jc = new JobConf();
Job j = new Job(jc);
//Just make sure no exception is thrown
assertNull(j.getAssignedJobID());
org.apache.hadoop.mapreduce.Job mockjob = mock(org.apache.hadoop.mapreduce.Job.class);
org.apache.hadoop.mapreduce.JobID jid = new org.apache.hadoop.mapreduce.JobID("test",0);
when(mockjob.getJobID()).thenReturn(jid);
j.setJob(mockjob);
JobID expected = new JobID("test",0);
assertEquals(expected, j.getAssignedJobID());
verify(mockjob).getJobID();
}
/**
* @return the mapred ID of this job as assigned by the mapred framework.
*/
public JobID getAssignedJobID() {
org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobId();
if (temp == null) {
return null;
}
return JobID.downgrade(temp);
}
@Nonnull
public static String getJobIdFromTaskId(@Nonnull String taskidStr) {
if (!taskidStr.startsWith("task_")) {// workaround for Tez
taskidStr = taskidStr.replace("task", "task_");
taskidStr = taskidStr.substring(0, taskidStr.lastIndexOf('_'));
}
TaskID taskId = TaskID.forName(taskidStr);
JobID jobId = taskId.getJobID();
return jobId.toString();
}
/**
* {@inheritDoc}
*/
@Override
public void stop(String externalJobId) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
return;
}
runningJob.killJob();
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void update(MSubmission submission) {
double progress = -1;
Counters counters = null;
String externalJobId = submission.getExternalJobId();
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
SubmissionStatus newStatus = status(runningJob);
SubmissionError error = error(runningJob);
if (newStatus.isRunning()) {
progress = progress(runningJob);
} else {
counters = counters(runningJob);
}
// these properties change as the job runs, rest of the submission attributes
// do not change as job runs
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
submission.setLastUpdateDate(new Date());
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
/**
* @param taskInfo Task info.
* @param job Job.
* @param jobId Job ID.
* @param locNodeId Local node ID.
* @param jobConfDataInput DataInput for read JobConf.
*/
public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job, HadoopJobId jobId,
@Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
super(taskInfo, job);
this.locNodeId = locNodeId;
// Before create JobConf instance we should set new context class loader.
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
try {
JobConf jobConf = new JobConf();
try {
jobConf.readFields(jobConfDataInput);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
// For map-reduce jobs prefer local writes.
jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
initializePartiallyRawComparator(jobConf);
jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
useNewMapper = jobConf.getUseNewMapper();
useNewReducer = jobConf.getUseNewReducer();
useNewCombiner = jobConf.getCombinerClass() == null;
}
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
}
@Override
public boolean cancelRequest(String requestId) throws IOException {
PigRequestStats stats = this.getRequestStats(requestId);
if (stats.getStatus().equals(Status.SUBMITTED.toString())) {
List<String> jobs= stats.getJobs();
for (String job : jobs) {
job= job.substring(JT_UI.length());
JobConf jobConf = new JobConf();
jobConf.set("fs.default.name", PropertyLoader.getInstance().getProperty("fs.default.name"));
jobConf.set("mapred.job.tracker", PropertyLoader.getInstance().getProperty("jobtracker"));
try {
JobClient jobClient = new JobClient(jobConf);
RunningJob rJob = jobClient.getJob(JobID.forName(job));
if (! rJob.isComplete()) {
rJob.killJob();
}
} catch (Exception e) {
throw new IOException ("Unable to kill job " + job);
}
}
PigRequestStats requestStats= new PigRequestStats(0, 0, null, jobs.size());
requestStats.setJobs(jobs);
requestStats.setStatus(Status.KILLED.toString());
Path statsPath= new Path(PropertyLoader.getInstance().getProperty(Constants.REQUEST_PATH) + requestId + "/stats");
PigUtils.writeStatsFile(statsPath, requestStats);
return true;
} else {
return false;
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
try {
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
MRJobStats addMRJobStats(Job job) {
MapReduceOper mro = jobMroMap.get(job);
if (mro == null) {
LOG.warn("unable to get MR oper for job: " + job.toString());
return null;
}
MRJobStats js = mroJobMap.get(mro);
JobID jobId = job.getAssignedJobID();
js.setId(jobId);
js.setAlias(mro);
js.setConf(job.getJobConf());
return js;
}