下面列出了怎么用org.apache.hadoop.mapreduce.JobStatus的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Test {@link Statistics.JobStats}.
*/
@Test
@SuppressWarnings("deprecation")
public void testJobStats() throws Exception {
Job job = new Job() {};
JobStats stats = new JobStats(1, 2, job);
assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps());
assertEquals("Incorrect num-reds", 2, stats.getNoOfReds());
assertTrue("Incorrect job", job == stats.getJob());
assertNull("Unexpected job status", stats.getJobStatus());
// add a new status
JobStatus status = new JobStatus();
stats.updateJobStatus(status);
assertNotNull("Missing job status", stats.getJobStatus());
assertTrue("Incorrect job status", status == stats.getJobStatus());
}
@Test
public void testRetriesOnConnectionFailure() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@Test
public void testHistoryServerNotConfigured() throws Exception {
//RM doesn't have app report and job History Server is not configured
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
null, getRMDelegate());
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertEquals("N/A", jobStatus.getUsername());
Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
//RM has app report and job History Server is not configured
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
ApplicationReport applicationReport = getFinishedApplicationReport();
when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
applicationReport);
clientServiceDelegate = getClientServiceDelegate(null, rm);
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
}
/**
* To ensure nothing broken after we removed normalization
* from the MRAM side
* @throws Exception
*/
@Test
public void testJobWithNonNormalizedCapabilities() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
JobConf jobConf = new JobConf(mrCluster.getConfig());
jobConf.setInt("mapreduce.map.memory.mb", 700);
jobConf.setInt("mapred.reduce.memory.mb", 1500);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(jobConf);
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
boolean completed = job.waitForCompletion(true);
Assert.assertTrue("Job should be completed", completed);
Assert.assertEquals("Job should be finished successfully",
JobStatus.State.SUCCEEDED, job.getJobState());
}
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
this.mapOutputFile.setJobId(taskId.getJobID());
skipRanges.readFields(in);
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
if (jobCleanup) {
jobRunStateForCleanup =
WritableUtils.readEnum(in, JobStatus.State.class);
}
jobSetup = in.readBoolean();
username = Text.readString(in);
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
extraData.readFields(in);
}
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
writer.println("Total jobs:" + jobs.length);
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
int usedMem = job.getUsedMem();
int rsvdMem = job.getReservedMem();
int neededMem = job.getNeededMem();
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
job.getSchedulingInfo());
}
writer.flush();
}
/**
* Test {@link Statistics.JobStats}.
*/
@Test
@SuppressWarnings("deprecation")
public void testJobStats() throws Exception {
Job job = new Job() {};
JobStats stats = new JobStats(1, 2, job);
assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps());
assertEquals("Incorrect num-reds", 2, stats.getNoOfReds());
assertTrue("Incorrect job", job == stats.getJob());
assertNull("Unexpected job status", stats.getJobStatus());
// add a new status
JobStatus status = new JobStatus();
stats.updateJobStatus(status);
assertNotNull("Missing job status", stats.getJobStatus());
assertTrue("Incorrect job status", status == stats.getJobStatus());
}
/**
* To ensure nothing broken after we removed normalization
* from the MRAM side
* @throws Exception
*/
@Test
public void testJobWithNonNormalizedCapabilities() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
JobConf jobConf = new JobConf(mrCluster.getConfig());
jobConf.setInt("mapreduce.map.memory.mb", 700);
jobConf.setInt("mapred.reduce.memory.mb", 1500);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(jobConf);
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
boolean completed = job.waitForCompletion(true);
Assert.assertTrue("Job should be completed", completed);
Assert.assertEquals("Job should be finished successfully",
JobStatus.State.SUCCEEDED, job.getJobState());
}
@Test
public void testJobAbort() throws Exception {
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
FileSystem fs = jobAttemptPath.getFileSystem(conf);
Set<String> uploads = runTasks(job, 4, 3);
Assert.assertTrue(fs.exists(jobAttemptPath));
jobCommitter.abortJob(job, JobStatus.State.KILLED);
Assert.assertEquals("Should have committed no uploads",
0, jobCommitter.results.getCommits().size());
Assert.assertEquals("Should have deleted no uploads",
0, jobCommitter.results.getDeletes().size());
Assert.assertEquals("Should have aborted all uploads",
uploads, getAbortedIds(jobCommitter.results.getAborts()));
Assert.assertFalse(fs.exists(jobAttemptPath));
}
private JobStatus.State getJobStateFromVertexStatusState(VertexStatus.State state) {
switch(state) {
case INITED:
return JobStatus.State.PREP;
case RUNNING:
return JobStatus.State.RUNNING;
case SUCCEEDED:
return JobStatus.State.SUCCEEDED;
case KILLED:
return JobStatus.State.KILLED;
case FAILED:
case ERROR:
return JobStatus.State.FAILED;
default:
throw new TezUncheckedException("Unknown VertexStatus.State: " + state);
}
}
public JobStatus printJobStatus(YARNRunner yarnRunner, JobID jobID) throws IOException, InterruptedException {
JobStatus jobStatus;
jobStatus = yarnRunner.getJobStatus(jobID);
// print overall job M/R progresses
LOGGER.info("\nJob " + jobStatus.getJobName() + "in queue (" + jobStatus.getQueue() + ")" + " progress M/R: " + jobStatus.getMapProgress() + "/" + jobStatus.getReduceProgress());
LOGGER.info("Tracking URL : " + jobStatus.getTrackingUrl());
LOGGER.info("Reserved memory : " + jobStatus.getReservedMem() + ", used memory : "+ jobStatus.getUsedMem() + " and used slots : "+ jobStatus.getNumUsedSlots());
// list map & reduce tasks statuses and progress
TaskReport[] reports = yarnRunner.getTaskReports(jobID, TaskType.MAP);
for (int i = 0; i < reports.length; i++) {
LOGGER.info("MAP: Status " + reports[i].getCurrentStatus() + " with task ID " + reports[i].getTaskID() + ", and progress " + reports[i].getProgress());
}
reports = yarnRunner.getTaskReports(jobID, TaskType.REDUCE);
for (int i = 0; i < reports.length; i++) {
LOGGER.info("REDUCE: " + reports[i].getCurrentStatus() + " with task ID " + reports[i].getTaskID() + ", and progress " + reports[i].getProgress());
}
return jobStatus;
}
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
writer.println("Total jobs:" + jobs.length);
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
int usedMem = job.getUsedMem();
int rsvdMem = job.getReservedMem();
int neededMem = job.getNeededMem();
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
job.getSchedulingInfo());
}
writer.flush();
}
@Override
public JobStatus getJobStatus(JobID jobID) throws IOException,
InterruptedException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
String jobFile = MRApps.getJobFile(conf, user, jobID);
DAGStatus dagStatus;
try {
if(dagClient == null) {
dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf, null);
}
dagStatus = dagClient.getDAGStatus(null);
return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
} catch (TezException e) {
throw new IOException(e);
}
}
@Test
public void testJobReportFromHistoryServer() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
getJobReportResponseFromHistoryServer());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());
Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());
Assert.assertEquals(1.0f, jobStatus.getMapProgress(), 0.0f);
Assert.assertEquals(1.0f, jobStatus.getReduceProgress(), 0.0f);
}
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
try {
Set<String> appTypes = new HashSet<String>(1);
appTypes.add(MRJobConfig.MR_APPLICATION_TYPE);
EnumSet<YarnApplicationState> appStates =
EnumSet.noneOf(YarnApplicationState.class);
return TypeConverter.fromYarnApps(
client.getApplications(appTypes, appStates), this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
}
@Test (timeout = 60000)
public void testFailingAttempt() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testFailingAttempt().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(3); // speed up failures
job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
}
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
@Test (timeout = 60000)
public void testFailingJob() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testFailingJob().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertFalse(succeeded);
Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
}
public void readFields(DataInput in) throws IOException {
jobFile = StringInterner.weakIntern(Text.readString(in));
taskId = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
skipRanges.readFields(in);
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
if (jobCleanup) {
jobRunStateForCleanup =
WritableUtils.readEnum(in, JobStatus.State.class);
}
jobSetup = in.readBoolean();
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
user = StringInterner.weakIntern(Text.readString(in));
int len = in.readInt();
encryptedSpillKey = new byte[len];
extraData.readFields(in);
in.readFully(encryptedSpillKey);
}
@Test
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2")))
.thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(
any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
} catch (YarnException e) {
throw new IOException(e);
}
}
public void testAbort() throws IOException {
JobConf job = new JobConf();
setConfForFileOutputCommitter(job);
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job, committer
.getTaskAttemptPath(tContext));
// do setup
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
// write output
FileSystem localFs = FileSystem.getLocal(job);
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
job, file, reporter);
writeOutput(theRecordWriter, reporter);
// do abort
committer.abortTask(tContext);
File expectedFile = new File(new Path(committer
.getTaskAttemptPath(tContext), file).toString());
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
.toString());
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
.listFiles().length);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
try {
Set<String> appTypes = new HashSet<String>(1);
appTypes.add(TezConstants.TEZ_APPLICATION_TYPE);
return TypeConverter.fromYarnApps(client.getApplications(appTypes),
this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
}
@Override
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(context);
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
String fileName =
(state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
: ABORT_KILLED_FILE_NAME;
fs.create(new Path(outputPath, fileName)).close();
}
boolean isJobAbortTask() {
// the task is an abort task if its marked for cleanup and the final
// expected state is either failed or killed.
return isJobCleanupTask()
&& (jobRunStateForCleanup == JobStatus.State.KILLED
|| jobRunStateForCleanup == JobStatus.State.FAILED);
}
public void readFields(DataInput in) throws IOException {
jobFile = StringInterner.weakIntern(Text.readString(in));
taskId = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
skipRanges.readFields(in);
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
if (jobCleanup) {
jobRunStateForCleanup =
WritableUtils.readEnum(in, JobStatus.State.class);
}
jobSetup = in.readBoolean();
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
user = StringInterner.weakIntern(Text.readString(in));
int len = in.readInt();
encryptedSpillKey = new byte[len];
extraData.readFields(in);
in.readFully(encryptedSpillKey);
}
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
/* check if the status is not running, if not send kill to RM */
JobStatus status = getJobStatus(arg0);
if (status.getState() == JobStatus.State.RUNNING ||
status.getState() == JobStatus.State.PREP) {
try {
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
} catch (YarnException e) {
throw new IOException(e);
}
return;
}
}
boolean isJobAbortTask() {
// the task is an abort task if its marked for cleanup and the final
// expected state is either failed or killed.
return isJobCleanupTask()
&& (jobRunStateForCleanup == JobStatus.State.KILLED
|| jobRunStateForCleanup == JobStatus.State.FAILED);
}
/**
* Delete the temporary directory, including all of the work directories.
* @param context the job's context
*/
@Override
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
// delete the _temporary folder
cleanupJob(context);
}
/**
* Dump a list of currently running jobs
* @throws IOException
*/
private void listJobs(Cluster cluster)
throws IOException, InterruptedException {
List<JobStatus> runningJobs = new ArrayList<JobStatus>();
for (JobStatus job : cluster.getAllJobStatuses()) {
if (!job.isJobComplete()) {
runningJobs.add(job);
}
}
displayJobList(runningJobs.toArray(new JobStatus[0]));
}
/**
* Delete the temporary directory, including all of the work directories.
* @param context the job's context
*/
@Override
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
// delete the _temporary folder
cleanupJob(context);
}