类org.apache.hadoop.mapred.jobcontrol.Job源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.jobcontrol.Job的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spork   文件: MRPigStatsUtil.java
/**
 * Returns the count for the given counter name in the counter group
 * 'MultiStoreCounters'
 *
 * @param job the MR job
 * @param jobClient the Hadoop job client
 * @param counterName the counter name
 * @return the count of the given counter name
 */
public static long getMultiStoreCount(Job job, JobClient jobClient,
        String counterName) {
    long value = -1;
    try {
        RunningJob rj = jobClient.getJob(job.getAssignedJobID());
        if (rj != null) {
            Counters.Counter counter = rj.getCounters().getGroup(
                    MULTI_STORE_COUNTER_GROUP).getCounterForName(counterName);
            value = counter.getValue();
        }
    } catch (IOException e) {
        LOG.warn("Failed to get the counter for " + counterName, e);
    }
    return value;
}
 
源代码2 项目: spork   文件: MRPigStatsUtil.java
private static MRJobStats addSuccessJobStats(SimplePigStats ps, Job job) {
    if (ps.isJobSeen(job)) return null;

    MRJobStats js = ps.addMRJobStats(job);
    if (js == null) {
        LOG.warn("unable to add job stats");
    } else {
        js.setSuccessful(true);

        js.addMapReduceStatistics(job);

        js.addCounters(job);

        js.addOutputStatistics();

        js.addInputStatistics();
    }
    return js;
}
 
源代码3 项目: spork   文件: JobControlCompiler.java
/**
 * Moves all the results of a collection of MR jobs to the final
 * output directory. Some of the results may have been put into a
 * temp location to work around restrictions with multiple output
 * from a single map reduce job.
 *
 * This method should always be called after the job execution
 * completes.
 */
public void moveResults(List<Job> completedJobs) throws IOException {
    for (Job job: completedJobs) {
        Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
        if (pair != null && pair.second != null) {
            Path tmp = pair.second;
            Path abs = new Path(tmp, "abs");
            Path rel = new Path(tmp, "rel");
            FileSystem fs = tmp.getFileSystem(conf);

            if (fs.exists(abs)) {
                moveResults(abs, abs.toUri().getPath(), fs);
            }

            if (fs.exists(rel)) {
                moveResults(rel, rel.toUri().getPath()+"/", fs);
            }
        }
    }
}
 
源代码4 项目: spork   文件: JobControlCompiler.java
private boolean okToRunLocal(org.apache.hadoop.mapreduce.Job job, MapReduceOper mro, List<POLoad> lds) throws IOException {
    Configuration conf = job.getConfiguration();
    if(!conf.getBoolean(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, false)) {
        return false;
    }

    long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
    long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job, inputByteMax);
    log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
    if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
        return false;
    }

    int reducers = conf.getInt(MRConfiguration.REDUCE_TASKS, 1);
    log.info("No of reducers: " + reducers);
    if (reducers > 1) {
        return false;
    }

    return true;
}
 
源代码5 项目: spork   文件: JobControlCompiler.java
public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
    // the OutputFormat we report to Hadoop is always PigOutputFormat which
    // can be wrapped with LazyOutputFormat provided if it is supported by
    // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
    if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
        try {
            Class<?> clazz = PigContext
                    .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
            Method method = clazz.getMethod("setOutputFormatClass",
                    org.apache.hadoop.mapreduce.Job.class, Class.class);
            method.invoke(null, job, PigOutputFormat.class);
        } catch (Exception e) {
            job.setOutputFormatClass(PigOutputFormat.class);
            log.warn(PigConfiguration.PIG_OUTPUT_LAZY
                    + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
        }
    } else {
        job.setOutputFormatClass(PigOutputFormat.class);
    }
}
 
源代码6 项目: spork   文件: MapReduceLauncher.java
/**
 * If stop_on_failure is enabled and any job has failed, an ExecException is thrown.
 * @param stop_on_failure whether it's enabled.
 * @throws ExecException If stop_on_failure is enabled and any job is failed
 */
private void checkStopOnFailure(boolean stop_on_failure) throws ExecException{
    if (jc.getFailedJobs().isEmpty())
        return;

    if (stop_on_failure){
        int errCode = 6017;
        StringBuilder msg = new StringBuilder();

        for (int i=0; i<jc.getFailedJobs().size(); i++) {
            Job j = jc.getFailedJobs().get(i);
            msg.append("JobID: " + j.getAssignedJobID() + " Reason: " + j.getMessage());
            if (i!=jc.getFailedJobs().size()-1) {
                msg.append("\n");
            }
        }

        throw new ExecException(msg.toString(), errCode,
                PigException.REMOTE_ENVIRONMENT);
    }
}
 
源代码7 项目: spork   文件: MapReduceLauncher.java
private void createSuccessFile(Job job, POStore store) throws IOException {
    if(shouldMarkOutputDir(job)) {
        Path outputPath = new Path(store.getSFile().getFileName());
        String scheme = outputPath.toUri().getScheme();
        if (HadoopShims.hasFileSystemImpl(outputPath, job.getJobConf())) {
            FileSystem fs = outputPath.getFileSystem(job.getJobConf());
            if (fs.exists(outputPath)) {
                // create a file in the folder to mark it
                Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
                if (!fs.exists(filePath)) {
                    fs.create(filePath).close();
                }
            }
        } else {
            log.warn("No FileSystem for scheme: " + scheme + ". Not creating success file");
        }
    }
}
 
源代码8 项目: spork   文件: HadoopShims.java
public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
    if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
        LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
        return null;
    }
    Cluster cluster = new Cluster(job.getJobConf());
    try {
        org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
        if (mrJob == null) { // In local mode, mrJob will be null
            mrJob = job.getJob();
        }
        org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
        return DowngradeHelper.downgradeTaskReports(reports);
    } catch (InterruptedException ir) {
        throw new IOException(ir);
    }
}
 
源代码9 项目: hadoop   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码10 项目: big-c   文件: ValueAggregatorJob.java
public static JobControl createValueAggregatorJobs(String args[]
  , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  
  JobControl theControl = new JobControl("ValueAggregatorJobs");
  ArrayList<Job> dependingJobs = new ArrayList<Job>();
  JobConf aJobConf = createValueAggregatorJob(args);
  if(descriptors != null)
    setAggregatorDescriptors(aJobConf, descriptors);
  Job aJob = new Job(aJobConf, dependingJobs);
  theControl.addJob(aJob);
  return theControl;
}
 
源代码11 项目: spork   文件: SimplePigStats.java
MRJobStats addMRJobStats(Job job) {
    MapReduceOper mro = jobMroMap.get(job);

    if (mro == null) {
        LOG.warn("unable to get MR oper for job: " + job.toString());
        return null;
    }
    MRJobStats js = mroJobMap.get(mro);

    JobID jobId = job.getAssignedJobID();
    js.setId(jobId);
    js.setAlias(mro);
    js.setConf(job.getJobConf());
    return js;
}
 
源代码12 项目: spork   文件: SimplePigStats.java
void mapMROperToJob(MapReduceOper mro, Job job) {
    if (mro == null) {
        LOG.warn("null MR operator");
    } else {
        MRJobStats js = mroJobMap.get(mro);
        if (js == null) {
            LOG.warn("null job stats for mro: " + mro.getOperatorKey());
        } else {
            jobMroMap.put(job, mro);
        }
    }
}
 
源代码13 项目: spork   文件: MRPigStatsUtil.java
@Private
public static void setBackendException(Job job, Exception e) {
    JobID jobId = job.getAssignedJobID();
    if (jobId == null) {
        return;
    }
    PigStats.get().setBackendException(jobId.toString(), e);
}
 
源代码14 项目: spork   文件: MRPigStatsUtil.java
private static MRJobStats addFailedJobStats(SimplePigStats ps, Job job) {
    if (ps.isJobSeen(job)) return null;

    MRJobStats js = ps.addMRJobStats(job);
    if (js == null) {
        LOG.warn("unable to add failed job stats");
    } else {
        js.setSuccessful(false);
        js.addOutputStatistics();
        js.addInputStatistics();
    }
    return js;
}
 
源代码15 项目: spork   文件: Launcher.java
/**
 * Compute the progress of the current job submitted through the JobControl
 * object jc to the JobClient jobClient
 *
 * @param jc
 *            - The JobControl object that has been submitted
 * @param jobClient
 *            - The JobClient to which it has been submitted
 * @return The progress as a precentage in double format
 * @throws IOException
 */
protected double calculateProgress(JobControl jc)
        throws IOException {
    double prog = 0.0;
    prog += jc.getSuccessfulJobs().size();

    List<Job> runnJobs = jc.getRunningJobs();
    for (Job j : runnJobs) {
        prog += HadoopShims.progressOfRunningJob(j);
    }
    return prog;
}
 
源代码16 项目: spork   文件: JobControlCompiler.java
public JobControlCompiler(PigContext pigContext, Configuration conf, Configuration defaultConf) {
    this.pigContext = pigContext;
    this.conf = conf;
    this.defaultConf = defaultConf;
    jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
    jobMroMap = new HashMap<Job, MapReduceOper>();
}
 
源代码17 项目: spork   文件: JobControlCompiler.java
/**
 * Returns all store locations of a previously compiled job
 */
public List<POStore> getStores(Job job) {
    Pair<List<POStore>, Path> pair = jobStoreMap.get(job);
    if (pair != null && pair.first != null) {
        return pair.first;
    } else {
        return new ArrayList<POStore>();
    }
}
 
源代码18 项目: spork   文件: JobControlCompiler.java
/**
 * Adjust the number of reducers based on the default_parallel, requested parallel and estimated
 * parallel. For sampler jobs, we also adjust the next job in advance to get its runtime parallel as
 * the number of partitions used in the sampler.
 * @param plan the MR plan
 * @param mro the MR operator
 * @param nwJob the current job
 * @throws IOException
 */
public void adjustNumReducers(MROperPlan plan, MapReduceOper mro,
        org.apache.hadoop.mapreduce.Job nwJob) throws IOException {
    int jobParallelism = calculateRuntimeReducers(mro, nwJob);

    if (mro.isSampler() && plan.getSuccessors(mro) != null) {
        // We need to calculate the final number of reducers of the next job (order-by or skew-join)
        // to generate the quantfile.
        MapReduceOper nextMro = plan.getSuccessors(mro).get(0);

        // Here we use the same conf and Job to calculate the runtime #reducers of the next job
        // which is fine as the statistics comes from the nextMro's POLoads
        int nPartitions = calculateRuntimeReducers(nextMro, nwJob);

        // set the runtime #reducer of the next job as the #partition
        ParallelConstantVisitor visitor =
                new ParallelConstantVisitor(mro.reducePlan, nPartitions);
        visitor.visit();
    }
    log.info("Setting Parallelism to " + jobParallelism);

    Configuration conf = nwJob.getConfiguration();

    // set various parallelism into the job conf for later analysis, PIG-2779
    conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
    conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
    conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);

    // this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
    mro.requestedParallelism = jobParallelism;

    // finally set the number of reducers
    conf.setInt(MRConfiguration.REDUCE_TASKS, jobParallelism);
}
 
源代码19 项目: spork   文件: JobControlCompiler.java
/**
 * Calculate the runtime #reducers based on the default_parallel, requested parallel and estimated
 * parallel, and save it to MapReduceOper's runtimeParallelism.
 * @return the runtimeParallelism
 * @throws IOException
 */
private int calculateRuntimeReducers(MapReduceOper mro,
        org.apache.hadoop.mapreduce.Job nwJob) throws IOException{
    // we don't recalculate for the same job
    if (mro.runtimeParallelism != -1) {
        return mro.runtimeParallelism;
    }

    int jobParallelism = -1;

    if (mro.requestedParallelism > 0) {
        jobParallelism = mro.requestedParallelism;
    } else if (pigContext.defaultParallel > 0) {
        jobParallelism = pigContext.defaultParallel;
    } else {
        mro.estimatedParallelism = estimateNumberOfReducers(nwJob, mro);
        if (mro.estimatedParallelism > 0) {
            jobParallelism = mro.estimatedParallelism;
        } else {
            // reducer estimation could return -1 if it couldn't estimate
            log.info("Could not estimate number of reducers and no requested or default " +
                    "parallelism set. Defaulting to 1 reducer.");
            jobParallelism = 1;
        }
    }

    // save it
    mro.runtimeParallelism = jobParallelism;
    return jobParallelism;
}
 
源代码20 项目: spork   文件: JobControlCompiler.java
/**
 * Looks up the estimator from REDUCER_ESTIMATOR_KEY and invokes it to find the number of
 * reducers to use. If REDUCER_ESTIMATOR_KEY isn't set, defaults to InputSizeReducerEstimator.
 * @param job
 * @param mapReducerOper
 * @throws IOException
 */
public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
        MapReduceOper mapReducerOper) throws IOException {
    Configuration conf = job.getConfiguration();

    PigReducerEstimator estimator = conf.get(PIG_EXEC_REDUCER_ESTIMATOR) == null ?
            new InputSizeReducerEstimator() :
                PigContext.instantiateObjectFromParams(conf,
                        PIG_EXEC_REDUCER_ESTIMATOR, PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, PigReducerEstimator.class);

            log.info("Using reducer estimator: " + estimator.getClass().getName());
            int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
            return numberOfReducers;
}
 
源代码21 项目: spork   文件: MapReduceLauncher.java
@Override
public void kill() {
    try {
        log.debug("Receive kill signal");
        if (jc!=null) {
            for (Job job : jc.getRunningJobs()) {
                HadoopShims.killJob(job);
                log.info("Job " + job.getAssignedJobID() + " killed");
            }
        }
    } catch (Exception e) {
        log.warn("Encounter exception on cleanup:" + e);
    }
}
 
源代码22 项目: spork   文件: TestGroupConstParallelMR.java
@Override
public void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    
    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();

    assertEquals("parallism", 1, parallel);
}
 
源代码23 项目: spork   文件: TestGroupConstParallelMR.java
@Override
public void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);
    
    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();
    
    assertEquals("parallism", 100, parallel);
}
 
源代码24 项目: spork   文件: TestJobSubmissionMR.java
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
    MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
    
    ConfigurationValidator.validatePigProperties(pc.getProperties());
    Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
    JobControlCompiler jcc = new JobControlCompiler(pc, conf);

    JobControl jobControl = jcc.compile(mrPlan, "Test");
    Job job = jobControl.getWaitingJobs().get(0);
    int parallel = job.getJobConf().getNumReduceTasks();

    assertEquals(100, parallel);
    Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
}
 
源代码25 项目: spork   文件: HadoopShims.java
/**
 * Returns the progress of a Job j which is part of a submitted JobControl
 * object. The progress is for this Job. So it has to be scaled down by the
 * num of jobs that are present in the JobControl.
 *
 * @param j The Job for which progress is required
 * @return Returns the percentage progress of this Job
 * @throws IOException
 */
public static double progressOfRunningJob(Job j)
        throws IOException {
    RunningJob rj = j.getJobClient().getJob(j.getAssignedJobID());
    if (rj == null && j.getState() == Job.SUCCESS)
        return 1;
    else if (rj == null)
        return 0;
    else {
        return (rj.mapProgress() + rj.reduceProgress()) / 2;
    }
}
 
源代码26 项目: spork   文件: HadoopShims.java
public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
    if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
        LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
        return null;
    }
    JobClient jobClient = job.getJobClient();
    TaskReport[] reports = null;
    if (type == TaskType.MAP) {
        reports = jobClient.getMapTaskReports(job.getAssignedJobID());
    } else {
        reports = jobClient.getReduceTaskReports(job.getAssignedJobID());
    }
    return reports == null ? null : Arrays.asList(reports).iterator();
}
 
源代码27 项目: spork   文件: PigMapReduce.java
public IllustratorContext(Job job,
      List<Pair<PigNullableWritable, Writable>> input,
      POPackage pkg
      ) throws IOException, InterruptedException {
    super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
        null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
    bos = new ByteArrayOutputStream();
    dos = new DataOutputStream(bos);
    org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
    sortComparator = nwJob.getSortComparator();
    groupingComparator = nwJob.getGroupingComparator();
    
    Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
            @Override
            public int compare(Pair<PigNullableWritable, Writable> o1,
                               Pair<PigNullableWritable, Writable> o2) {
                try {
                    o1.first.write(dos);
                    int l1 = bos.size();
                    o2.first.write(dos);
                    int l2 = bos.size();
                    byte[] bytes = bos.toByteArray();
                    bos.reset();
                    return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
                } catch (IOException e) {
                    throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
                }
            }
        }
    );
    currentValues = new ArrayList<NullableTuple>();
    it = input.iterator();
    if (it.hasNext()) {
        Pair<PigNullableWritable, Writable> entry = it.next();
        nextKey = entry.first;
        nextValue = (NullableTuple) entry.second;
    }
    pack = pkg;
}
 
源代码28 项目: spork   文件: HadoopShims.java
public static Counters getCounters(Job job) throws IOException {
    try {
        Cluster cluster = new Cluster(job.getJobConf());
        org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
        if (mrJob == null) { // In local mode, mrJob will be null
            mrJob = job.getJob();
        }
        return new Counters(mrJob.getCounters());
    } catch (Exception ir) {
        throw new IOException(ir);
    }
}
 
源代码29 项目: spork   文件: HadoopShims.java
/**
 * Returns the progress of a Job j which is part of a submitted JobControl
 * object. The progress is for this Job. So it has to be scaled down by the
 * num of jobs that are present in the JobControl.
 *
 * @param j The Job for which progress is required
 * @return Returns the percentage progress of this Job
 * @throws IOException
 */
public static double progressOfRunningJob(Job j)
        throws IOException {
    org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
    try {
        return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
    } catch (Exception ir) {
        return 0;
    }
}
 
源代码30 项目: spork   文件: HadoopShims.java
public static void killJob(Job job) throws IOException {
    org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
    try {
        if (mrJob != null) {
            mrJob.killJob();
        }
    } catch (Exception ir) {
        throw new IOException(ir);
    }
}
 
 类所在包
 类方法
 同包方法