下面列出了怎么用org.apache.hadoop.mapred.jobcontrol.Job的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Returns the count for the given counter name in the counter group
* 'MultiStoreCounters'
*
* @param job the MR job
* @param jobClient the Hadoop job client
* @param counterName the counter name
* @return the count of the given counter name
*/
public static long getMultiStoreCount(Job job, JobClient jobClient,
String counterName) {
long value = -1;
try {
RunningJob rj = jobClient.getJob(job.getAssignedJobID());
if (rj != null) {
Counters.Counter counter = rj.getCounters().getGroup(
MULTI_STORE_COUNTER_GROUP).getCounterForName(counterName);
value = counter.getValue();
}
} catch (IOException e) {
LOG.warn("Failed to get the counter for " + counterName, e);
}
return value;
}
private static MRJobStats addSuccessJobStats(SimplePigStats ps, Job job) {
if (ps.isJobSeen(job)) return null;
MRJobStats js = ps.addMRJobStats(job);
if (js == null) {
LOG.warn("unable to add job stats");
} else {
js.setSuccessful(true);
js.addMapReduceStatistics(job);
js.addCounters(job);
js.addOutputStatistics();
js.addInputStatistics();
}
return js;
}
/**
* Moves all the results of a collection of MR jobs to the final
* output directory. Some of the results may have been put into a
* temp location to work around restrictions with multiple output
* from a single map reduce job.
*
* This method should always be called after the job execution
* completes.
*/
public void moveResults(List<Job> completedJobs) throws IOException {
for (Job job: completedJobs) {
Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
if (pair != null && pair.second != null) {
Path tmp = pair.second;
Path abs = new Path(tmp, "abs");
Path rel = new Path(tmp, "rel");
FileSystem fs = tmp.getFileSystem(conf);
if (fs.exists(abs)) {
moveResults(abs, abs.toUri().getPath(), fs);
}
if (fs.exists(rel)) {
moveResults(rel, rel.toUri().getPath()+"/", fs);
}
}
}
}
private boolean okToRunLocal(org.apache.hadoop.mapreduce.Job job, MapReduceOper mro, List<POLoad> lds) throws IOException {
Configuration conf = job.getConfiguration();
if(!conf.getBoolean(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, false)) {
return false;
}
long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job, inputByteMax);
log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
return false;
}
int reducers = conf.getInt(MRConfiguration.REDUCE_TASKS, 1);
log.info("No of reducers: " + reducers);
if (reducers > 1) {
return false;
}
return true;
}
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
// can be wrapped with LazyOutputFormat provided if it is supported by
// the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
try {
Class<?> clazz = PigContext
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
Method method = clazz.getMethod("setOutputFormatClass",
org.apache.hadoop.mapreduce.Job.class, Class.class);
method.invoke(null, job, PigOutputFormat.class);
} catch (Exception e) {
job.setOutputFormatClass(PigOutputFormat.class);
log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
}
} else {
job.setOutputFormatClass(PigOutputFormat.class);
}
}
/**
* If stop_on_failure is enabled and any job has failed, an ExecException is thrown.
* @param stop_on_failure whether it's enabled.
* @throws ExecException If stop_on_failure is enabled and any job is failed
*/
private void checkStopOnFailure(boolean stop_on_failure) throws ExecException{
if (jc.getFailedJobs().isEmpty())
return;
if (stop_on_failure){
int errCode = 6017;
StringBuilder msg = new StringBuilder();
for (int i=0; i<jc.getFailedJobs().size(); i++) {
Job j = jc.getFailedJobs().get(i);
msg.append("JobID: " + j.getAssignedJobID() + " Reason: " + j.getMessage());
if (i!=jc.getFailedJobs().size()-1) {
msg.append("\n");
}
}
throw new ExecException(msg.toString(), errCode,
PigException.REMOTE_ENVIRONMENT);
}
}
private void createSuccessFile(Job job, POStore store) throws IOException {
if(shouldMarkOutputDir(job)) {
Path outputPath = new Path(store.getSFile().getFileName());
String scheme = outputPath.toUri().getScheme();
if (HadoopShims.hasFileSystemImpl(outputPath, job.getJobConf())) {
FileSystem fs = outputPath.getFileSystem(job.getJobConf());
if (fs.exists(outputPath)) {
// create a file in the folder to mark it
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
if (!fs.exists(filePath)) {
fs.create(filePath).close();
}
}
} else {
log.warn("No FileSystem for scheme: " + scheme + ". Not creating success file");
}
}
}
public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
return null;
}
Cluster cluster = new Cluster(job.getJobConf());
try {
org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
if (mrJob == null) { // In local mode, mrJob will be null
mrJob = job.getJob();
}
org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
return DowngradeHelper.downgradeTaskReports(reports);
} catch (InterruptedException ir) {
throw new IOException(ir);
}
}
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
public static JobControl createValueAggregatorJobs(String args[]
, Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
JobControl theControl = new JobControl("ValueAggregatorJobs");
ArrayList<Job> dependingJobs = new ArrayList<Job>();
JobConf aJobConf = createValueAggregatorJob(args);
if(descriptors != null)
setAggregatorDescriptors(aJobConf, descriptors);
Job aJob = new Job(aJobConf, dependingJobs);
theControl.addJob(aJob);
return theControl;
}
MRJobStats addMRJobStats(Job job) {
MapReduceOper mro = jobMroMap.get(job);
if (mro == null) {
LOG.warn("unable to get MR oper for job: " + job.toString());
return null;
}
MRJobStats js = mroJobMap.get(mro);
JobID jobId = job.getAssignedJobID();
js.setId(jobId);
js.setAlias(mro);
js.setConf(job.getJobConf());
return js;
}
void mapMROperToJob(MapReduceOper mro, Job job) {
if (mro == null) {
LOG.warn("null MR operator");
} else {
MRJobStats js = mroJobMap.get(mro);
if (js == null) {
LOG.warn("null job stats for mro: " + mro.getOperatorKey());
} else {
jobMroMap.put(job, mro);
}
}
}
@Private
public static void setBackendException(Job job, Exception e) {
JobID jobId = job.getAssignedJobID();
if (jobId == null) {
return;
}
PigStats.get().setBackendException(jobId.toString(), e);
}
private static MRJobStats addFailedJobStats(SimplePigStats ps, Job job) {
if (ps.isJobSeen(job)) return null;
MRJobStats js = ps.addMRJobStats(job);
if (js == null) {
LOG.warn("unable to add failed job stats");
} else {
js.setSuccessful(false);
js.addOutputStatistics();
js.addInputStatistics();
}
return js;
}
/**
* Compute the progress of the current job submitted through the JobControl
* object jc to the JobClient jobClient
*
* @param jc
* - The JobControl object that has been submitted
* @param jobClient
* - The JobClient to which it has been submitted
* @return The progress as a precentage in double format
* @throws IOException
*/
protected double calculateProgress(JobControl jc)
throws IOException {
double prog = 0.0;
prog += jc.getSuccessfulJobs().size();
List<Job> runnJobs = jc.getRunningJobs();
for (Job j : runnJobs) {
prog += HadoopShims.progressOfRunningJob(j);
}
return prog;
}
public JobControlCompiler(PigContext pigContext, Configuration conf, Configuration defaultConf) {
this.pigContext = pigContext;
this.conf = conf;
this.defaultConf = defaultConf;
jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
jobMroMap = new HashMap<Job, MapReduceOper>();
}
/**
* Returns all store locations of a previously compiled job
*/
public List<POStore> getStores(Job job) {
Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
if (pair != null && pair.first != null) {
return pair.first;
} else {
return new ArrayList<POStore>();
}
}
/**
* Adjust the number of reducers based on the default_parallel, requested parallel and estimated
* parallel. For sampler jobs, we also adjust the next job in advance to get its runtime parallel as
* the number of partitions used in the sampler.
* @param plan the MR plan
* @param mro the MR operator
* @param nwJob the current job
* @throws IOException
*/
public void adjustNumReducers(MROperPlan plan, MapReduceOper mro,
org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
int jobParallelism = calculateRuntimeReducers(mro, nwJob);
if (mro.isSampler() && plan.getSuccessors(mro) != null) {
// We need to calculate the final number of reducers of the next job (order-by or skew-join)
// to generate the quantfile.
MapReduceOper nextMro = plan.getSuccessors(mro).get(0);
// Here we use the same conf and Job to calculate the runtime #reducers of the next job
// which is fine as the statistics comes from the nextMro's POLoads
int nPartitions = calculateRuntimeReducers(nextMro, nwJob);
// set the runtime #reducer of the next job as the #partition
ParallelConstantVisitor visitor =
new ParallelConstantVisitor(mro.reducePlan, nPartitions);
visitor.visit();
}
log.info("Setting Parallelism to " + jobParallelism);
Configuration conf = nwJob.getConfiguration();
// set various parallelism into the job conf for later analysis, PIG-2779
conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);
// this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
mro.requestedParallelism = jobParallelism;
// finally set the number of reducers
conf.setInt(MRConfiguration.REDUCE_TASKS, jobParallelism);
}
/**
* Calculate the runtime #reducers based on the default_parallel, requested parallel and estimated
* parallel, and save it to MapReduceOper's runtimeParallelism.
* @return the runtimeParallelism
* @throws IOException
*/
private int calculateRuntimeReducers(MapReduceOper mro,
org.apache.hadoop.mapreduce.Job nwJob) throws IOException{
// we don't recalculate for the same job
if (mro.runtimeParallelism != -1) {
return mro.runtimeParallelism;
}
int jobParallelism = -1;
if (mro.requestedParallelism > 0) {
jobParallelism = mro.requestedParallelism;
} else if (pigContext.defaultParallel > 0) {
jobParallelism = pigContext.defaultParallel;
} else {
mro.estimatedParallelism = estimateNumberOfReducers(nwJob, mro);
if (mro.estimatedParallelism > 0) {
jobParallelism = mro.estimatedParallelism;
} else {
// reducer estimation could return -1 if it couldn't estimate
log.info("Could not estimate number of reducers and no requested or default " +
"parallelism set. Defaulting to 1 reducer.");
jobParallelism = 1;
}
}
// save it
mro.runtimeParallelism = jobParallelism;
return jobParallelism;
}
/**
* Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
* reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
* @param job
* @param mapReducerOper
* @throws IOException
*/
public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
MapReduceOper mapReducerOper) throws IOException {
Configuration conf = job.getConfiguration();
PigReducerEstimator estimator = conf.get(PIG_EXEC_REDUCER_ESTIMATOR) == null ?
new InputSizeReducerEstimator() :
PigContext.instantiateObjectFromParams(conf,
PIG_EXEC_REDUCER_ESTIMATOR, PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, PigReducerEstimator.class);
log.info("Using reducer estimator: " + estimator.getClass().getName());
int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
return numberOfReducers;
}
@Override
public void kill() {
try {
log.debug("Receive kill signal");
if (jc!=null) {
for (Job job : jc.getRunningJobs()) {
HadoopShims.killJob(job);
log.info("Job " + job.getAssignedJobID() + " killed");
}
}
} catch (Exception e) {
log.warn("Encounter exception on cleanup:" + e);
}
}
@Override
public void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 1, parallel);
}
@Override
public void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals("parallism", 100, parallel);
}
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
JobControlCompiler jcc = new JobControlCompiler(pc, conf);
JobControl jobControl = jcc.compile(mrPlan, "Test");
Job job = jobControl.getWaitingJobs().get(0);
int parallel = job.getJobConf().getNumReduceTasks();
assertEquals(100, parallel);
Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
}
/**
* Returns the progress of a Job j which is part of a submitted JobControl
* object. The progress is for this Job. So it has to be scaled down by the
* num of jobs that are present in the JobControl.
*
* @param j The Job for which progress is required
* @return Returns the percentage progress of this Job
* @throws IOException
*/
public static double progressOfRunningJob(Job j)
throws IOException {
RunningJob rj = j.getJobClient().getJob(j.getAssignedJobID());
if (rj == null && j.getState() == Job.SUCCESS)
return 1;
else if (rj == null)
return 0;
else {
return (rj.mapProgress() + rj.reduceProgress()) / 2;
}
}
public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
return null;
}
JobClient jobClient = job.getJobClient();
TaskReport[] reports = null;
if (type == TaskType.MAP) {
reports = jobClient.getMapTaskReports(job.getAssignedJobID());
} else {
reports = jobClient.getReduceTaskReports(job.getAssignedJobID());
}
return reports == null ? null : Arrays.asList(reports).iterator();
}
public IllustratorContext(Job job,
List<Pair<PigNullableWritable, Writable>> input,
POPackage pkg
) throws IOException, InterruptedException {
super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
bos = new ByteArrayOutputStream();
dos = new DataOutputStream(bos);
org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
sortComparator = nwJob.getSortComparator();
groupingComparator = nwJob.getGroupingComparator();
Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
@Override
public int compare(Pair<PigNullableWritable, Writable> o1,
Pair<PigNullableWritable, Writable> o2) {
try {
o1.first.write(dos);
int l1 = bos.size();
o2.first.write(dos);
int l2 = bos.size();
byte[] bytes = bos.toByteArray();
bos.reset();
return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
} catch (IOException e) {
throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
}
}
}
);
currentValues = new ArrayList<NullableTuple>();
it = input.iterator();
if (it.hasNext()) {
Pair<PigNullableWritable, Writable> entry = it.next();
nextKey = entry.first;
nextValue = (NullableTuple) entry.second;
}
pack = pkg;
}
public static Counters getCounters(Job job) throws IOException {
try {
Cluster cluster = new Cluster(job.getJobConf());
org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
if (mrJob == null) { // In local mode, mrJob will be null
mrJob = job.getJob();
}
return new Counters(mrJob.getCounters());
} catch (Exception ir) {
throw new IOException(ir);
}
}
/**
* Returns the progress of a Job j which is part of a submitted JobControl
* object. The progress is for this Job. So it has to be scaled down by the
* num of jobs that are present in the JobControl.
*
* @param j The Job for which progress is required
* @return Returns the percentage progress of this Job
* @throws IOException
*/
public static double progressOfRunningJob(Job j)
throws IOException {
org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
try {
return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
} catch (Exception ir) {
return 0;
}
}
public static void killJob(Job job) throws IOException {
org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
try {
if (mrJob != null) {
mrJob.killJob();
}
} catch (Exception ir) {
throw new IOException(ir);
}
}