下面列出了怎么用org.apache.hadoop.mapred.RunningJob的API类实例代码及写法,或者点击链接到github查看源代码。
private void mrRun() throws Exception {
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
assertTrue(runJob.isComplete());
assertTrue(runJob.isSuccessful());
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
private void mrRun() throws Exception {
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
assertTrue(runJob.isComplete());
assertTrue(runJob.isSuccessful());
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
@Override
public void abortJob(final JobContext context, final JobStatus.State runState) throws java.io.IOException {
super.abortJob(context, runState);
final JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
final RunningJob job = jobClient.getJob((org.apache.hadoop.mapred.JobID) JobID.forName(context.getConfiguration().get("mapred.job.id")));
String diag = "";
for (final TaskCompletionEvent event : job.getTaskCompletionEvents(0))
switch (event.getTaskStatus()) {
case SUCCEEDED:
break;
default:
diag += "Diagnostics for: " + event.getTaskTrackerHttp() + "\n";
for (final String s : job.getTaskDiagnostics(event.getTaskAttemptId()))
diag += s + "\n";
diag += "\n";
break;
}
updateStatus(diag, context.getConfiguration().getInt("boa.hadoop.jobid", 0));
}
@Test
@SuppressWarnings("deprecation")
public void shoudBeValidMapReduceEvaluation() throws Exception {
Configuration cfg = UTIL.getConfiguration();
JobConf jobConf = new JobConf(cfg);
try {
jobConf.setJobName("process row task");
jobConf.setNumReduceTasks(1);
TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
jobConf);
TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
ClassificatorRowReduce.class, jobConf);
RunningJob job = JobClient.runJob(jobConf);
assertTrue(job.isSuccessful());
} finally {
if (jobConf != null)
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
@Test
@SuppressWarnings("deprecation")
public void shoudBeValidMapReduceWithPartitionerEvaluation()
throws IOException {
Configuration cfg = UTIL.getConfiguration();
JobConf jobConf = new JobConf(cfg);
try {
jobConf.setJobName("process row task");
jobConf.setNumReduceTasks(2);
TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
jobConf);
TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
RunningJob job = JobClient.runJob(jobConf);
assertTrue(job.isSuccessful());
} finally {
if (jobConf != null)
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
Configuration conf = UTIL.getConfiguration();
final JobConf job = new JobConf(conf);
job.setInputFormat(clazz);
job.setOutputFormat(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
LOG.debug("submitting job.");
final RunningJob run = JobClient.runJob(job);
assertTrue("job failed!", run.isSuccessful());
assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
}
@Override
protected void runJob(String jobName, Configuration c, List<Scan> scans)
throws IOException, InterruptedException, ClassNotFoundException {
JobConf job = new JobConf(TEST_UTIL.getConfiguration());
job.setJobName(jobName);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
TableMapReduceUtil.addDependencyJars(job);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(1); // one to get final "first" and "last" key
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
LOG.info("Started " + job.getJobName());
RunningJob runningJob = JobClient.runJob(job);
runningJob.waitForCompletion();
assertTrue(runningJob.isSuccessful());
LOG.info("After map/reduce completion - job " + jobName);
}
private RunningJob runMapReduceJob(CompressionCodecName codec, JobConf jobConf, Configuration conf, Path parquetPath) throws IOException, ClassNotFoundException, InterruptedException {
String writeSchema = "message example {\n" +
"required int32 line;\n" +
"required binary content;\n" +
"}";
FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
jobConf.setInputFormat(TextInputFormat.class);
TextInputFormat.addInputPath(jobConf, inputPath);
jobConf.setNumReduceTasks(0);
jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, GroupWriteSupport.class);
GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), jobConf);
jobConf.setMapperClass(TestZstandardCodec.DumpMapper.class);
return JobClient.runJob(jobConf);
}
/**
* Returns the count for the given counter name in the counter group
* 'MultiStoreCounters'
*
* @param job the MR job
* @param jobClient the Hadoop job client
* @param counterName the counter name
* @return the count of the given counter name
*/
public static long getMultiStoreCount(Job job, JobClient jobClient,
String counterName) {
long value = -1;
try {
RunningJob rj = jobClient.getJob(job.getAssignedJobID());
if (rj != null) {
Counters.Counter counter = rj.getCounters().getGroup(
MULTI_STORE_COUNTER_GROUP).getCounterForName(counterName);
value = counter.getValue();
}
} catch (IOException e) {
LOG.warn("Failed to get the counter for " + counterName, e);
}
return value;
}
@Override
public void killJob(String jobID, Configuration conf) throws BackendException {
try {
if (conf != null) {
JobConf jobConf = new JobConf(conf);
JobClient jc = new JobClient(jobConf);
JobID id = JobID.forName(jobID);
RunningJob job = jc.getJob(id);
if (job == null)
System.out.println("Job with id " + jobID + " is not active");
else
{
job.killJob();
log.info("Kill " + id + " submitted.");
}
}
} catch (IOException e) {
throw new BackendException(e);
}
}
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
FileSystem fs = FileSystem.get(theJobConf);
Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
RunningJob running = jc.submitJob(theJobConf);
this.mapredJobID = running.getID();
this.state = Job.RUNNING;
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
/**
* Driver to copy srcPath to destPath depending on required protocol.
* @param args arguments
*/
static void copy(final Configuration conf, final Arguments args
) throws IOException {
DistCopier copier = getCopier(conf, args);
if (copier != null) {
try {
JobClient client = copier.getJobClient();
RunningJob job = client.submitJob(copier.getJobConf());
try {
if (!client.monitorAndPrintJob(copier.getJobConf(), job)) {
throw new IOException("Job failed!");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
copier.finalizeCopiedFiles();
} finally {
copier.cleanupJob();
}
}
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
@Test
public void testSubmitWhenUserHasPermissionsToSubmitJobInQueueShouldExecuteSuccessfully()
throws IOException, InterruptedException, ClassNotFoundException {
Mockito.spy( YarnQueueAclsVerifier.class );
ConfigurationProxy configurationProxy = Mockito.mock( ConfigurationProxy.class );
JobClient jobClient = Mockito.mock( JobClient.class );
RunningJob runningJob = Mockito.mock( RunningJob.class );
Mockito.when( configurationProxy.createJobClient() ).thenReturn( jobClient );
Mockito.when( configurationProxy.submit() ).thenCallRealMethod();
Mockito.when( jobClient.getQueueAclsForCurrentUser() ).thenReturn( new MockQueueAclsInfo[] {
new MockQueueAclsInfo( StringUtils.EMPTY, new String[] {
"SUBMIT_APPLICATIONS"
} ),
new MockQueueAclsInfo( StringUtils.EMPTY, new String[] {} )
} );
Mockito.when( jobClient.submitJob( Mockito.any( JobConf.class ) ) ).thenReturn( runningJob );
Assert.assertNotNull( configurationProxy.submit() );
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: CartesianCommentComparison <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
// Configure the join type
JobConf conf = new JobConf("Cartesian Product");
conf.setJarByClass(CartesianCommentComparison.class);
conf.setMapperClass(CartesianMapper.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(CartesianInputFormat.class);
// Configure the input format
CartesianInputFormat.setLeftInputInfo(conf, TextInputFormat.class, args[0]);
CartesianInputFormat.setRightInputInfo(conf, TextInputFormat.class, args[0]);
TextOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
return job.isSuccessful() ? 0 : 1;
}
@Test
public void testRun_Running() throws Exception {
String jobId = "job_201407251005_0815";
createDefinition("mytest", jobId);
RunningJob job = createJob(jobId, JobStatus.RUNNING);
when(job.getJobState()).thenReturn(JobStatus.RUNNING);
Assert.assertEquals(0, executorService.getQueue().size());
checkAllIndexes();
Assert.assertEquals(1, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(1)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
Thread.sleep(60);
Assert.assertEquals(1, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(2)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
when(job.getJobState()).thenReturn(JobStatus.SUCCEEDED);
Thread.sleep(60);
Assert.assertEquals(0, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(3)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(1)).updateIndexerInternal(any(IndexerDefinition.class));
}
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
FileSystem fs = FileSystem.get(theJobConf);
Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
RunningJob running = jc.submitJob(theJobConf);
this.mapredJobID = running.getID();
this.state = Job.RUNNING;
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
/**
* Submit/run a map/reduce job.
*
* @param job
* @return true for success
* @throws IOException
*/
public static boolean runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean sucess = true;
RunningJob running = null;
try {
running = jc.submitJob(job);
JobID jobId = running.getID();
System.out.println("Job " + jobId + " is submitted");
while (!running.isComplete()) {
System.out.println("Job " + jobId + " is still running.");
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
}
running = jc.getJob(jobId);
}
sucess = running.isSuccessful();
} finally {
if (!sucess && (running != null)) {
running.killJob();
}
jc.close();
}
return sucess;
}
protected int jobFailed(Job job, RunningJob runningJob, FileSystem fs, Path workDir) throws IOException {
log.error("Map Reduce job " + job.getJobName() + " was unsuccessful. Check the logs.");
log.error("Since job was not successful, deleting work directory: " + workDir);
boolean deleted = fs.delete(workDir, true);
if (!deleted) {
log.error("Unable to remove job working directory: " + workDir);
}
if (runningJob.getJobState() == JobStatus.KILLED) {
log.warn("Job was killed");
return -2;
} else {
log.error("Job failed with a jobstate of " + runningJob.getJobState());
return -3;
}
}
private void encryptedShuffleWithCerts(boolean useClientCerts)
throws Exception {
try {
Configuration conf = new Configuration();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf,
useClientCerts);
conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
startCluster(conf);
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer =
new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
Assert.assertTrue(runJob.isComplete());
Assert.assertTrue(runJob.isSuccessful());
} finally {
stopCluster();
}
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
private void encryptedShuffleWithCerts(boolean useClientCerts)
throws Exception {
try {
Configuration conf = new Configuration();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf,
useClientCerts);
conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
startCluster(conf);
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer =
new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
Assert.assertTrue(runJob.isComplete());
Assert.assertTrue(runJob.isSuccessful());
} finally {
stopCluster();
}
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
public String status() {
RunningJob rj = this.rj;
if (rj != null)
return rj.toString();
else
return status;
}
/**
* {@inheritDoc}
*/
@Override
public void stop(String externalJobId) {
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
return;
}
runningJob.killJob();
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
private double progress(RunningJob runningJob) {
try {
if(runningJob == null) {
// Return default value
return -1;
}
return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
private Counters counters(RunningJob runningJob) {
try {
if(runningJob == null) {
// Return default value
return null;
}
return convertHadoop1MapreduceCounters(runningJob.getCounters());
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void update(MSubmission submission) {
double progress = -1;
Counters counters = null;
String externalJobId = submission.getExternalJobId();
try {
RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
SubmissionStatus newStatus = status(runningJob);
SubmissionError error = error(runningJob);
if (newStatus.isRunning()) {
progress = progress(runningJob);
} else {
counters = counters(runningJob);
}
// these properties change as the job runs, rest of the submission attributes
// do not change as job runs
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
submission.setLastUpdateDate(new Date());
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}