下面列出了org.apache.hadoop.mapreduce.Job#isSuccessful ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void afterRunJob(Job job, Throwable error) throws IOException {
try {
if (error == null && job.isSuccessful()) {
// job运行没有异常,而且运行成功,那么进行计算total member的代码
this.calculateTotalMembers(job.getConfiguration());
} else if (error == null) {
// job运行没有产生异常,但是运行失败
throw new RuntimeException("job 运行失败");
}
} catch (Throwable e) {
if (error != null) {
error = e;
}
throw new IOException("调用afterRunJob产生异常", e);
} finally {
super.afterRunJob(job, error);
}
}
@Override
protected void afterRunJob(Job job, Throwable error) throws IOException {
try {
if (error == null && job.isSuccessful()) {
// job运行没有异常,而且运行成功,那么进行计算total member的代码
this.calculateTotalMembers(job.getConfiguration());
} else if (error == null) {
// job运行没有产生异常,但是运行失败
throw new RuntimeException("job 运行失败");
}
} catch (Throwable e) {
if (error != null) {
error = e;
}
throw new IOException("调用afterRunJob产生异常", e);
} finally {
super.afterRunJob(job, error);
}
}
@Override
protected void afterRunJob(Job job, Throwable error) throws IOException {
try {
if (error == null && job.isSuccessful()) {
// job运行没有异常,而且运行成功,那么进行计算total user的代码
this.calculateTotalUsers(job.getConfiguration());
} else if (error == null) {
// job运行没有产生异常,但是运行失败
throw new RuntimeException("job 运行失败");
}
} catch (Throwable e) {
if (error != null) {
error = e;
}
throw new IOException("调用afterRunJob产生异常", e);
} finally {
super.afterRunJob(job, error);
}
}
private void submitAndWait(Job job) throws ClassNotFoundException, IOException, InterruptedException {
job.submit();
MRCompactor.addRunningHadoopJob(this.dataset, job);
LOG.info(String.format("MR job submitted for dataset %s, input %s, url: %s", this.dataset, getInputPaths(),
job.getTrackingURL()));
while (!job.isComplete()) {
if (this.policy == Policy.ABORT_ASAP) {
LOG.info(String.format(
"MR job for dataset %s, input %s killed due to input data incompleteness." + " Will try again later",
this.dataset, getInputPaths()));
job.killJob();
return;
}
Thread.sleep(MR_JOB_CHECK_COMPLETE_INTERVAL_MS);
}
if (!job.isSuccessful()) {
throw new RuntimeException(String.format("MR job failed for topic %s, input %s, url: %s", this.dataset,
getInputPaths(), job.getTrackingURL()));
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
sleep(100);
}
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
public static void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
/**
* Check a job for success or failure.
*/
public void process(Job job) throws IOException, InterruptedException {
if (job.isSuccessful()) {
onSuccess(job);
} else {
onFailure(job);
}
}
private void waitForJob(Job job) throws InterruptedException, IOException {
while (!job.isComplete()) {
System.out.println("waiting for job " + job.getJobName());
sleep(100);
}
System.out.println("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
static void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(50);
}
LOG.debug("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
int retVal = 0;
long start = System.nanoTime();
if (isAsync) {
job.submit();
} else {
job.waitForCompletion(true);
retVal = job.isSuccessful() ? 0 : 1;
logger.debug("Job '" + job.getJobName() + "' finished "
+ (job.isSuccessful() ? "successfully in " : "with failures. Time taken ")
+ formatTime((System.nanoTime() - start) / 1000000L));
}
return retVal;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String outpath = conf.get(OUTPUTPATH);
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()),
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) ,
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
job.setSortComparatorClass(JoinSelectSortComparator.class);
job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
job.setPartitionerClass(JoinSelectPartitioner.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
@Override
public void run() {
try {
Job job = createJob();
if (job == null) {
log.info("No MR job created. Skipping.");
this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
this.eventSubmitter.submit(Events.MR_JOB_SKIPPED);
onSkippedMRJob();
return;
}
job.submit();
log.info("MR tracking URL {} for job {}", job.getTrackingURL(), job.getJobName());
this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, Events.JOB_URL, job.getTrackingURL());
job.waitForCompletion(false);
this.mrJob = job;
if (job.isSuccessful()) {
this.eventSubmitter.submit(Events.MR_JOB_SUCCESSFUL, Events.JOB_URL, job.getTrackingURL());
this.onMRTaskComplete(true, null);
} else {
this.eventSubmitter.submit(Events.MR_JOB_FAILED, Events.JOB_URL, job.getTrackingURL());
this.onMRTaskComplete (false,
new IOException(String.format("MR Job:%s is not successful", job.getTrackingURL())));
}
} catch (Throwable t) {
log.error("Failed to run MR job.", t);
this.eventSubmitter.submit(Events.MR_JOB_FAILED, Events.FAILURE_CONTEXT, t.getMessage());
this.onMRTaskComplete (false, t);
}
}
protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
int retVal = 0;
long start = System.nanoTime();
if (isAsync) {
job.submit();
} else {
job.waitForCompletion(true);
retVal = job.isSuccessful() ? 0 : 1;
logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
}
return retVal;
}
private static boolean runImport(String[] args, Configuration configuration) throws IOException, InterruptedException, ClassNotFoundException {
// need to make a copy of the configuration because to make sure different temp dirs are used.
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(configuration), args);
Configuration newConf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = Import.createSubmittableJob(newConf, args);
job.waitForCompletion(false);
return job.isSuccessful();
}
@Override
public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord)
throws Exception {
String jobID = getJobID(taskRecord.getData());
if (jobID != null) {
Configuration conf = HBaseConfiguration.create(env.getConfiguration());
Configuration configuration = HBaseConfiguration.addHbaseResources(conf);
Cluster cluster = new Cluster(configuration);
Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID));
if (job == null) {
return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "");
}
if (job != null && job.isComplete()) {
if (job.isSuccessful()) {
LOGGER.warn("IndexRebuildTask checkCurrentResult job is successful "
+ taskRecord.getTableName());
return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
} else {
return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"Index is DISABLED");
}
}
}
return null;
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
sleep(100);
}
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
@Override
public int run(String[] args) throws Exception {
_configure(args);
final Configuration conf = getConf();
String type = conf.get(MetricsConfig.TYPE);
/*
* if the type is "errors", we want to process all of the errors from the metrics files first and then run the regular ingest metrics process
*/
// MetricsServer.setServerConf(conf);
// MetricsServer.initInstance();
if ("errors".equals(type)) {
try {
launchErrorsJob(Job.getInstance(conf), conf);
} catch (Exception e) {
log.info("Failed to launch errors job", e);
}
type = "ingest";
conf.set(MetricsConfig.TYPE, type);
}
/* Type logic so I can differeniate between loader and ingest metrics jobs */
Class<? extends Mapper<?,?,?,?>> mapperClass;
String outTable;
Path inputDirectoryPath = new Path(conf.get(MetricsConfig.INPUT_DIRECTORY));
FileSystem fs = FileSystem.get(inputDirectoryPath.toUri(), conf);
FileStatus[] fstats = fs.listStatus(inputDirectoryPath);
Path[] files = FileUtil.stat2Paths(fstats);
Path[] fileBuffer = new Path[MAX_FILES];
for (int i = 0; i < files.length;) {
Job job = Job.getInstance(getConf());
job.setJarByClass(this.getClass());
job.getConfiguration().setInt("mapred.job.reuse.jvm.num.tasks", -1);
if ("ingest".equalsIgnoreCase(type)) {
mapperClass = IngestMetricsMapper.class;
outTable = conf.get(MetricsConfig.INGEST_TABLE, MetricsConfig.DEFAULT_INGEST_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else if ("loader".equalsIgnoreCase(type)) {
mapperClass = LoaderMetricsMapper.class;
outTable = conf.get(MetricsConfig.LOADER_TABLE, MetricsConfig.DEFAULT_LOADER_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else if ("flagmaker".equalsIgnoreCase(type)) {
mapperClass = FlagMakerMetricsMapper.class;
outTable = conf.get(MetricsConfig.FLAGMAKER_TABLE, MetricsConfig.DEFAULT_FLAGMAKER_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else {
log.error(type + " is not a valid job type. Please use <ingest|loader>.");
return -1;
}
job.setJobName("MetricsIngester-" + type);
if (files.length - i > MAX_FILES) {
System.arraycopy(files, i, fileBuffer, 0, MAX_FILES);
i += MAX_FILES;
} else {
fileBuffer = new Path[files.length - i];
System.arraycopy(files, i, fileBuffer, 0, fileBuffer.length);
i += files.length - i;
}
SequenceFileInputFormat.setInputPaths(job, fileBuffer);
job.setMapperClass(mapperClass);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setConnectorInfo(job, conf.get(MetricsConfig.USER), new PasswordToken(conf.get(MetricsConfig.PASS, "").getBytes()));
AccumuloOutputFormat.setCreateTables(job, createTables);
AccumuloOutputFormat.setDefaultTableName(job, outTable);
log.info("zookeepers = " + conf.get(MetricsConfig.ZOOKEEPERS));
log.info("instance = " + conf.get(MetricsConfig.INSTANCE));
log.info("clientConfuguration = "
+ ClientConfiguration.loadDefault().withInstance(conf.get(MetricsConfig.INSTANCE)).withZkHosts(conf.get(MetricsConfig.ZOOKEEPERS)));
AccumuloOutputFormat.setZooKeeperInstance(job,
ClientConfiguration.loadDefault().withInstance(conf.get(MetricsConfig.INSTANCE)).withZkHosts(conf.get(MetricsConfig.ZOOKEEPERS)));
AccumuloOutputFormat.setBatchWriterOptions(job, new BatchWriterConfig().setMaxLatency(25, TimeUnit.MILLISECONDS));
job.submit();
job.waitForCompletion(true);
if (job.isSuccessful()) {
for (Path p : fileBuffer) {
fs.delete(p, true);
}
}
}
return 0;
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
logger.info("input dir:" + dir);
Path inputPath = new Path(StringUtils.unEscapeString(dir));
Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
String indexPath = conf.get(LindenJobConfig.INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(new Path(indexPath))) {
fs.delete(new Path(indexPath), true);
}
int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
Shard[] shards = createShards(indexPath, numShards);
Shard.setIndexShards(conf, shards);
//empty trash;
(new Trash(conf)).expunge();
Job job = Job.getInstance(conf, "linden-hadoop-indexing");
job.setJarByClass(LindenJob.class);
job.setMapperClass(LindenMapper.class);
job.setCombinerClass(LindenCombiner.class);
job.setReducerClass(LindenReducer.class);
job.setMapOutputKeyClass(Shard.class);
job.setMapOutputValueClass(IntermediateForm.class);
job.setOutputKeyClass(Shard.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(IndexUpdateOutputFormat.class);
job.setReduceSpeculativeExecution(false);
job.setNumReduceTasks(numShards);
String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
if (lindenSchemaFile == null) {
throw new IOException("no schema file is found");
}
logger.info("Adding schema file: " + lindenSchemaFile);
job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
if (lindenPropertiesFile == null) {
throw new IOException("no linden properties file is found");
}
logger.info("Adding linden properties file: " + lindenPropertiesFile);
job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
Path[] inputs = FileInputFormat.getInputPaths(job);
StringBuilder buffer = new StringBuilder(inputs[0].toString());
for (int i = 1; i < inputs.length; i++) {
buffer.append(",");
buffer.append(inputs[i].toString());
}
logger.info("mapreduce.input.dir = " + buffer.toString());
logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));
job.waitForCompletion(true);
if (!job.isSuccessful()) {
throw new RuntimeException("Job failed");
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
setConf(new Configuration());
getConf().set("fs.default.name", "local");
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
AccumuloGraphConfiguration cfg = new AccumuloGraphConfiguration().setInstanceName("_mapreduce_instance2").setUser("root").setPassword("".getBytes())
.setGraphName("_mapreduce_table_2").setInstanceType(InstanceType.Mock).setCreate(true);
job.setInputFormatClass(VertexInputFormat.class);
VertexInputFormat.setAccumuloGraphConfiguration(job, cfg);
ElementOutputFormat.setAccumuloGraphConfiguration(job, cfg);
job.setMapperClass(TestVertexMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Element.class);
job.setOutputFormatClass(ElementOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}